Skip to content

Commit

Permalink
Messages relay fixes (paritytech#2073)
Browse files Browse the repository at this point in the history
* removed obsolete check that is superseded by the unblock checks below

* if messages race transaction submit has failed, do not restart loop. Instead, wait for new best nonces from target node and retry selection. That's because submit has probably failed because other relayer has submitted same nonces

* reset nonces_to_submit and nonces_submitted if at least one of selected/submitted nonces is already at target

* removed extra check
  • Loading branch information
svyatonik authored Apr 25, 2023
1 parent daf4c7d commit bc637a3
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 52 deletions.
47 changes: 6 additions & 41 deletions relays/messages/src/message_race_delivery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -343,38 +343,14 @@ where
// There's additional condition in the message delivery race: target would reject messages
// if there are too much unconfirmed messages at the inbound lane.

// The receiving race is responsible to deliver confirmations back to the source chain. So
// if there's a lot of unconfirmed messages, let's wait until it'll be able to do its job.
let latest_received_nonce_at_target = target_nonces.latest_nonce;
let confirmations_missing =
latest_received_nonce_at_target.checked_sub(latest_confirmed_nonce_at_source);
match confirmations_missing {
Some(confirmations_missing)
if confirmations_missing >= self.max_unconfirmed_nonces_at_target =>
{
log::debug!(
target: "bridge",
"Cannot deliver any more messages from {} to {}. Too many unconfirmed nonces \
at target: target.latest_received={:?}, source.latest_confirmed={:?}, max={:?}",
MessageDeliveryRace::<P>::source_name(),
MessageDeliveryRace::<P>::target_name(),
latest_received_nonce_at_target,
latest_confirmed_nonce_at_source,
self.max_unconfirmed_nonces_at_target,
);

return None
},
_ => (),
}

// Ok - we may have new nonces to deliver. But target may still reject new messages, because
// we haven't notified it that (some) messages have been confirmed. So we may want to
// include updated `source.latest_confirmed` in the proof.
//
// Important note: we're including outbound state lane proof whenever there are unconfirmed
// nonces on the target chain. Other strategy is to include it only if it's absolutely
// necessary.
let latest_received_nonce_at_target = target_nonces.latest_nonce;
let latest_confirmed_nonce_at_target = target_nonces.nonces_data.confirmed_nonce;
let outbound_state_proof_required =
latest_confirmed_nonce_at_target < latest_confirmed_nonce_at_source;
Expand Down Expand Up @@ -585,6 +561,11 @@ where
self.strategy.source_nonces_updated(at_block, nonces)
}

fn reset_best_target_nonces(&mut self) {
self.target_nonces = None;
self.strategy.reset_best_target_nonces();
}

fn best_target_nonces_updated<RS: RaceState<SourceHeaderIdOf<P>, TargetHeaderIdOf<P>>>(
&mut self,
nonces: TargetClientNonces<DeliveryRaceTargetNoncesData>,
Expand Down Expand Up @@ -808,22 +789,6 @@ mod tests {
);
}

#[async_std::test]
async fn message_delivery_strategy_selects_nothing_if_too_many_confirmations_missing() {
let (state, mut strategy) = prepare_strategy();

// if there are already `max_unconfirmed_nonces_at_target` messages on target,
// we need to wait until confirmations will be delivered by receiving race
strategy.latest_confirmed_nonces_at_source = vec![(
header_id(1),
strategy.target_nonces.as_ref().unwrap().latest_nonce -
strategy.max_unconfirmed_nonces_at_target,
)]
.into_iter()
.collect();
assert_eq!(strategy.select_nonces_to_deliver(state).await, None);
}

#[async_std::test]
async fn message_delivery_strategy_includes_outbound_state_proof_when_new_nonces_are_available()
{
Expand Down
23 changes: 20 additions & 3 deletions relays/messages/src/message_race_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,9 @@ pub trait RaceStrategy<SourceHeaderId, TargetHeaderId, Proof>: Debug {
at_block: SourceHeaderId,
nonces: SourceClientNonces<Self::SourceNoncesRange>,
);
/// Called when we want to wait until next `best_target_nonces_updated` before selecting
/// any nonces for delivery.
fn reset_best_target_nonces(&mut self);
/// Called when best nonces are updated at target node of the race.
fn best_target_nonces_updated<RS: RaceState<SourceHeaderId, TargetHeaderId>>(
&mut self,
Expand Down Expand Up @@ -542,14 +545,22 @@ pub async fn run<P: MessageRace, SC: SourceClient<P>, TC: TargetClient<P>>(
P::target_name(),
);

race_state.reset_nonces_to_submit();
race_state.nonces_submitted = Some(artifacts.nonces);
target_tx_tracker.set(artifacts.tx_tracker.wait().fuse());
},
&mut target_go_offline_future,
async_std::task::sleep,
|| format!("Error submitting proof {}", P::target_name()),
).fail_if_error(FailedClient::Target).map(|_| true)?;
).fail_if_connection_error(FailedClient::Target)?;

// in any case - we don't need to retry submitting the same nonces again until
// we read nonces from the target client
race_state.reset_nonces_to_submit();
// if we have failed to submit transaction AND that is not the connection issue,
// then we need to read best target nonces before selecting nonces again
if !target_client_is_online {
strategy.reset_best_target_nonces();
}
},
target_transaction_status = target_tx_tracker => {
match (target_transaction_status, race_state.nonces_submitted.as_ref()) {
Expand Down Expand Up @@ -699,7 +710,13 @@ pub async fn run<P: MessageRace, SC: SourceClient<P>, TC: TargetClient<P>>(
.fuse(),
);
} else if let Some(source_required_header) = source_required_header.clone() {
log::debug!(target: "bridge", "Going to require {} header {:?} at {}", P::source_name(), source_required_header, P::target_name());
log::debug!(
target: "bridge",
"Going to require {} header {:?} at {}",
P::source_name(),
source_required_header,
P::target_name(),
);
target_require_source_header
.set(race_target.require_source_header(source_required_header).fuse());
} else if target_best_nonces_required {
Expand Down
48 changes: 40 additions & 8 deletions relays/messages/src/message_race_strategy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,10 @@ impl<
)
}

fn reset_best_target_nonces(&mut self) {
self.best_target_nonce = None;
}

fn best_target_nonces_updated<
RS: RaceState<
HeaderId<SourceHeaderHash, SourceHeaderNumber>,
Expand All @@ -266,19 +270,37 @@ impl<
) {
let nonce = nonces.latest_nonce;

// if **some** of nonces that we have selected to submit already present at the
// target chain => select new nonces
let need_to_select_new_nonces = race_state
.nonces_to_submit()
.map(|nonces| *nonces.end() <= nonce)
.map(|nonces| nonce >= *nonces.start())
.unwrap_or(false);
if need_to_select_new_nonces {
log::trace!(
target: "bridge",
"Latest nonce at target is {}. Clearing nonces to submit: {:?}",
nonce,
race_state.nonces_to_submit(),
);

race_state.reset_nonces_to_submit();
}

// if **some** of nonces that we have submitted already present at the
// target chain => select new nonces
let need_new_nonces_to_submit = race_state
.nonces_submitted()
.map(|nonces| *nonces.end() <= nonce)
.map(|nonces| nonce >= *nonces.start())
.unwrap_or(false);
if need_new_nonces_to_submit {
log::trace!(
target: "bridge",
"Latest nonce at target is {}. Clearing submitted nonces: {:?}",
nonce,
race_state.nonces_submitted(),
);

race_state.reset_nonces_submitted();
}

Expand Down Expand Up @@ -415,21 +437,31 @@ mod tests {
let mut state = TestRaceStateImpl::default();
let mut strategy = BasicStrategy::<TestMessageLane>::new();
state.nonces_to_submit = Some((header_id(1), 5..=10, (5..=10, None)));
strategy.best_target_nonces_updated(target_nonces(7), &mut state);
// we are going to submit 5..=10, so having latest nonce 4 at target is fine
strategy.best_target_nonces_updated(target_nonces(4), &mut state);
assert!(state.nonces_to_submit.is_some());
strategy.best_target_nonces_updated(target_nonces(10), &mut state);
assert!(state.nonces_to_submit.is_none());
// any nonce larger than 4 invalidates the `nonces_to_submit`
for nonce in 5..=11 {
state.nonces_to_submit = Some((header_id(1), 5..=10, (5..=10, None)));
strategy.best_target_nonces_updated(target_nonces(nonce), &mut state);
assert!(state.nonces_to_submit.is_none());
}
}

#[test]
fn submitted_nonces_are_dropped_on_target_nonce_update() {
let mut state = TestRaceStateImpl::default();
let mut strategy = BasicStrategy::<TestMessageLane>::new();
state.nonces_submitted = Some(5..=10);
strategy.best_target_nonces_updated(target_nonces(7), &mut state);
// we have submitted 5..=10, so having latest nonce 4 at target is fine
strategy.best_target_nonces_updated(target_nonces(4), &mut state);
assert!(state.nonces_submitted.is_some());
strategy.best_target_nonces_updated(target_nonces(10), &mut state);
assert!(state.nonces_submitted.is_none());
// any nonce larger than 4 invalidates the `nonces_submitted`
for nonce in 5..=11 {
state.nonces_submitted = Some(5..=10);
strategy.best_target_nonces_updated(target_nonces(nonce), &mut state);
assert!(state.nonces_submitted.is_none());
}
}

#[async_std::test]
Expand Down

0 comments on commit bc637a3

Please sign in to comment.