Skip to content

Commit

Permalink
Fix for schedule refreshing bug (#1144)
Browse files Browse the repository at this point in the history
* Adjusted logs for better msg correlation.

* Fixes #1143.

* Selective backoff in path worker.

* Fmt & clippy

* Adjusted the backoff mechanism to depend on current chan length

* Apply @romac's suggestion

* Changelog entry
  • Loading branch information
adizere authored Jul 7, 2021
1 parent 3da7dc1 commit 05afbfe
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 17 deletions.
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,20 @@ reliability of Hermes ([#697]).
- Update to `tendermint-rs` v0.20.0 ([#1125])
- Add inline documentation to config.toml ([#1127])

### BUG FIXES

- [ibc-relayer]
- Fix for schedule refreshing bug ([#1143])


[#600]: https://github.com/informalsystems/ibc-rs/issues/600
[#697]: https://github.com/informalsystems/ibc-rs/issues/697
[#1062]: https://github.com/informalsystems/ibc-rs/issues/1062
[#1057]: https://github.com/informalsystems/ibc-rs/issues/1057
[#1125]: https://github.com/informalsystems/ibc-rs/issues/1125
[#1127]: https://github.com/informalsystems/ibc-rs/issues/1127
[#1140]: https://github.com/informalsystems/ibc-rs/issues/1140
[#1143]: https://github.com/informalsystems/ibc-rs/issues/1143


## v0.5.0
Expand Down
37 changes: 24 additions & 13 deletions relayer/src/link.rs
Original file line number Diff line number Diff line change
Expand Up @@ -598,10 +598,11 @@ impl RelayPath {

for i in 0..MAX_RETRIES {
info!(
"[{}] relay op. data to {}, proofs height {}, (delayed by: {:?}) [try {}/{}]",
"[{}] relay op. data of {} msgs(s) to {} (height {}), delayed by: {:?} [try {}/{}]",
self,
odata.batch.len(),
odata.target,
odata.proofs_height,
odata.proofs_height.increment(),
odata.scheduled_time.elapsed(),
i + 1,
MAX_RETRIES
Expand Down Expand Up @@ -1326,11 +1327,17 @@ impl RelayPath {
IbcEvent::SendPacket(e) => {
// Catch any SendPacket event that timed-out
if self.send_packet_event_handled(e)? {
debug!("[{}] {} already handled", self, e);
debug!(
"[{}] refreshing schedule: already handled send packet {}",
self, e
);
} else if let Some(new_msg) =
self.build_timeout_from_send_packet_event(e, dst_current_height)?
{
debug!("[{}] found a timed-out msg in the op data {}", self, odata);
debug!(
"[{}] refreshing schedule: found a timed-out msg in the op data {}",
self, odata
);
timed_out.entry(odata_pos).or_insert_with(Vec::new).push(
TransitMessage {
event: event.clone(),
Expand All @@ -1344,7 +1351,10 @@ impl RelayPath {
}
IbcEvent::WriteAcknowledgement(e) => {
if self.write_ack_event_handled(e)? {
debug!("[{}] {} already handled", self, e);
debug!(
"[{}] refreshing schedule: already handled {} write ack ",
self, e
);
} else {
retain_batch.push(gm.clone());
}
Expand All @@ -1357,6 +1367,13 @@ impl RelayPath {
odata.batch = retain_batch;
}

// Replace the original operational data with the updated one
self.dst_operational_data = all_dst_odata;
// Possibly some op. data became empty (if no events were kept).
// Retain only the non-empty ones.
self.dst_operational_data.retain(|o| !o.batch.is_empty());

// Handle timed-out events
if timed_out.is_empty() {
// Nothing timed out in the meantime
return Ok(());
Expand All @@ -1370,20 +1387,14 @@ impl RelayPath {
new_od.batch = batch;

info!(
"[{}] re-scheduling from new timed-out batch of size {}",
"[{}] refreshing schedule: re-scheduling from new timed-out batch of size {}",
self,
new_od.batch.len()
);

self.schedule_operational_data(new_od)?;
}

self.dst_operational_data = all_dst_odata;

// Possibly some op. data became empty (if no events were kept).
// Retain only the non-empty ones.
self.dst_operational_data.retain(|o| !o.batch.is_empty());

Ok(())
}

Expand All @@ -1400,7 +1411,7 @@ impl RelayPath {
}

info!(
"[{}] scheduling op. data with {} msg(s) for {} chain (height {})",
"[{}] scheduling op. data with {} msg(s) for {} (height {})",
self,
od.batch.len(),
od.target,
Expand Down
2 changes: 1 addition & 1 deletion relayer/src/worker/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ impl WorkerHandle {
Ok(())
}

/// Send a batch of [`NewBlock`] event to the worker.
/// Send a [`NewBlock`] event to the worker.
pub fn send_new_block(&self, height: Height, new_block: NewBlock) -> Result<(), BoxError> {
self.tx.send(WorkerCmd::NewBlock { height, new_block })?;
Ok(())
Expand Down
13 changes: 10 additions & 3 deletions relayer/src/worker/uni_chan_path.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{thread, time::Duration};
use std::time::Duration;

use anomaly::BoxError;
use crossbeam_channel::Receiver;
Expand Down Expand Up @@ -57,10 +57,17 @@ impl UniChanPathWorker {
}

loop {
thread::sleep(Duration::from_millis(200));
const BACKOFF: Duration = Duration::from_millis(200);

// Pop-out any unprocessed commands
// If there are no incoming commands, it's safe to backoff.
let maybe_cmd = crossbeam_channel::select! {
recv(self.cmd_rx) -> cmd => cmd.ok(),
recv(crossbeam_channel::after(BACKOFF)) -> _ => None,
};

let result = retry_with_index(retry_strategy::worker_default_strategy(), |index| {
Self::step(self.cmd_rx.try_recv().ok(), &mut link, index)
Self::step(maybe_cmd.clone(), &mut link, index)
});

match result {
Expand Down

0 comments on commit 05afbfe

Please sign in to comment.