Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix execute_schedule method leaking operational data #2118

Merged
merged 43 commits into from
May 27, 2022
Merged
Show file tree
Hide file tree
Changes from 42 commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
042480e
Pull in upstream changes
seanchen1991 Apr 22, 2022
69a1f8f
Merge branch 'informalsystems:master' into master
seanchen1991 Apr 22, 2022
83b60d7
Adding integration test for execute_schedule
seanchen1991 Apr 22, 2022
60edac7
Adding integration test for execute_schedule
seanchen1991 Apr 22, 2022
5b26c2f
Finish stubbing out execute_schedule test
seanchen1991 Apr 22, 2022
37130a3
Call `chains.shutdown` method
seanchen1991 Apr 25, 2022
c032b4f
Correctly shut down chain
seanchen1991 Apr 25, 2022
a561715
Shut down node b as well
seanchen1991 Apr 26, 2022
fe08ba3
Debugging execute_schedule test
seanchen1991 May 9, 2022
37d96b7
Add Debug derivations
seanchen1991 May 10, 2022
e58a6e1
Merge branch 'master' of https://github.com/informalsystems/ibc-rs in…
seanchen1991 May 10, 2022
a1267f4
Update integration test so that its flow correctly tests `execute_sch…
seanchen1991 May 11, 2022
1fc709d
Attempt to perform a second IBC transfer
seanchen1991 May 11, 2022
cfc7c70
Remove info's
seanchen1991 May 11, 2022
4c327b2
Increase sleep timeout duration
seanchen1991 May 11, 2022
40a3ef3
Merge branch 'master' of https://github.com/informalsystems/ibc-rs in…
seanchen1991 May 11, 2022
dbe86ca
Incorportate new test framework features
seanchen1991 May 11, 2022
ac8532d
Remove unnecessary `sleep` call
seanchen1991 May 11, 2022
c3b0858
Correctly use new test framework features
seanchen1991 May 11, 2022
7b50b16
Get assertions passing for now
seanchen1991 May 11, 2022
de2ea25
Send two transactions, one in each direction
seanchen1991 May 13, 2022
543cfa7
Add doc comment for test
seanchen1991 May 13, 2022
8cb0aca
Improve panic messages
seanchen1991 May 13, 2022
3ddc4ef
Refactor test so that it is actually testing the desired behavior
seanchen1991 May 20, 2022
dedce23
Attempt at fixing `execute_schedule` leaky logic
seanchen1991 May 20, 2022
a948c3f
Flesh out doc comments some more
seanchen1991 May 20, 2022
0321f0e
Remove a duplicate function
seanchen1991 May 20, 2022
142feea
Make use of OperationalDataTarget enum
seanchen1991 May 20, 2022
0dec734
Remove redundant enum
seanchen1991 May 20, 2022
ef2014c
Remove some Debug derives
seanchen1991 May 20, 2022
ef3f498
Remove one more debug derive
seanchen1991 May 20, 2022
5afe71e
Merge branch 'master' into execute-schedule-test
seanchen1991 May 20, 2022
e102db1
Merge branch 'master' of https://github.com/informalsystems/ibc-rs in…
seanchen1991 May 20, 2022
8a3ed6b
Merge branch 'execute-schedule-test' of github.com:seanchen1991/ibc-r…
seanchen1991 May 20, 2022
77040d8
Add `try_fetch_scheduled_operational_data` back in
seanchen1991 May 20, 2022
6a7bf0e
Give `do_execute_schedule` a more descriptive name
seanchen1991 May 20, 2022
402b2c0
Improve `execute_schedule_for_target_chain` method's documentation
seanchen1991 May 23, 2022
26f61d0
Add a bunch of clarifying comments
seanchen1991 May 24, 2022
58d78cf
More clarification of comments
seanchen1991 May 24, 2022
746d990
Flesh out `OperationalData` docs
seanchen1991 May 24, 2022
d4342fe
Add changelog entry
seanchen1991 May 24, 2022
b9e2af3
Incorporate PR feedback
seanchen1991 May 26, 2022
e4dedbe
Merge branch 'master' into execute-schedule-test
adizere May 27, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
- Fix `execute_schedule` method dropping operational data due to improper
handling of errors. ([#2118](https://github.com/informalsystems/ibc-rs/issues/1153))
21 changes: 14 additions & 7 deletions relayer/src/link/operational_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,12 @@ use crate::chain::tracking::TrackingId;
use crate::link::error::LinkError;
use crate::link::RelayPath;

/// The chain that the events associated with a piece of [`OperationalData`] are bound for.
#[derive(Clone, Copy, PartialEq)]
pub enum OperationalDataTarget {
/// The chain which generated the events associated with the `OperationalData`.
Source,
/// The chain receiving the events associated with the `OperationalData``.
Destination,
}

Expand Down Expand Up @@ -75,18 +78,22 @@ pub struct TransitMessage {
pub msg: Any,
}

/// Holds all the necessary information for handling a set of in-transit messages.
///
/// Each `OperationalData` item is uniquely identified by the combination of two attributes:
/// - `target`: represents the target of the packet messages, either source or destination chain,
/// - `proofs_height`: represents the height for the proofs in all the messages.
/// Note: this is the height at which the proofs are queried. A client consensus state at
/// `proofs_height + 1` must exist on-chain in order to verify the proofs.
/// Holds all the necessary information for handling a batch of in-transit messages. This includes
/// an event received from a chain along with any other packets related to the event (i.e.
/// 'receive' or 'timeout' packets) that the relayer has to submit in response to the event.
#[derive(Clone)]
pub struct OperationalData {
/// Represents the height for the proofs in all the messages. Note that this is the height
/// at which the proofs are queried. For example, for Tendermint chains, a client consensus
/// state at `proofs_height + 1` must exist on-chain in order to verify the proofs.
pub proofs_height: Height,
/// The batch of messages associated with this piece of operational data.
pub batch: Vec<TransitMessage>,
/// Represents the target of the packet messages, either the source or the destination
/// chain.
pub target: OperationalDataTarget,
/// A unique ID for tracking this batch of events starting from when they were received
/// until the transactions corresponding to those events is submitted.
pub tracking_id: TrackingId,
/// Stores `Some(ConnectionDelay)` if the delay is non-zero and `None` otherwise
connection_delay: Option<ConnectionDelay>,
Expand Down
132 changes: 113 additions & 19 deletions relayer/src/link/relay_path.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,8 @@ pub struct RelayPath<ChainA: ChainHandle, ChainB: ChainHandle> {
// mostly timeout packet messages.
// The operational data targeting the destination chain
// comprises mostly RecvPacket and Ack msgs.
pub(crate) src_operational_data: Queue<OperationalData>,
pub(crate) dst_operational_data: Queue<OperationalData>,
pub src_operational_data: Queue<OperationalData>,
pub dst_operational_data: Queue<OperationalData>,
seanchen1991 marked this conversation as resolved.
Show resolved Hide resolved

// Toggle for the transaction confirmation mechanism.
confirm_txes: bool,
Expand Down Expand Up @@ -391,7 +391,7 @@ impl<ChainA: ChainHandle, ChainB: ChainHandle> RelayPath<ChainA, ChainB> {
for i in 1..=MAX_RETRIES {
let cleared = self
.schedule_recv_packet_and_timeout_msgs(height, tracking_id)
.and_then(|()| self.schedule_packet_ack_msgs(height, tracking_id));
.and_then(|_| self.schedule_packet_ack_msgs(height, tracking_id));

match cleared {
Ok(()) => return Ok(()),
Expand All @@ -406,6 +406,7 @@ impl<ChainA: ChainHandle, ChainB: ChainHandle> RelayPath<ChainA, ChainB> {
}

/// Clears any packets that were sent before `height`.
/// If no height is passed in, then the latest height of the source chain is used.
pub fn schedule_packet_clearing(&self, height: Option<Height>) -> Result<(), LinkError> {
let span = span!(Level::DEBUG, "clear");
let _enter = span.enter();
Expand Down Expand Up @@ -1278,27 +1279,120 @@ impl<ChainA: ChainHandle, ChainB: ChainHandle> RelayPath<ChainA, ChainB> {
}
}

/// Checks if there are any operational data items ready,
/// and if so performs the relaying of corresponding packets
/// to the target chain.
/// Drives the relaying of elapsed operational data items meant for
/// a specified target chain forward.
///
/// This method performs relaying using the asynchronous sender.
/// Retains the operational data as pending, and associates it
/// with one or more transaction hash(es).
pub fn execute_schedule(&self) -> Result<(), LinkError> {
let (src_ods, dst_ods) = self.try_fetch_scheduled_operational_data()?;
/// Given an iterator of `OperationalData` elements, this function
/// first determines whether the current piece of operational data
/// has elapsed.
///
/// A piece of operational data is considered 'elapsed' if it has been waiting
/// for an amount of time that surpasses both of the following:
/// 1. The time duration specified in the connection delay
/// 2. The number of blocks specified in the connection delay
///
/// If the current piece of operational data has elapsed, then relaying
/// is performed using the asynchronous sender. Operational data is
/// retained as pending and is associated with one or more transaction
/// hash(es).
///
/// Should an error occur when attempting to relay a piece of operational
/// data, this function returns all subsequent unprocessed pieces of
/// operational data back to the caller so that they can be re-queued
/// for processing; the operational data that failed to send is dropped.
///
/// Note that pieces of operational data that have not elapsed yet are
/// also placed in the 'unprocessed' bucket.
fn execute_schedule_for_target_chain<I: Iterator<Item = OperationalData>>(
&mut self,
mut operations: I,
target_chain: OperationalDataTarget,
) -> Result<VecDeque<OperationalData>, (VecDeque<OperationalData>, LinkError)> {
let mut unprocessed = VecDeque::new();

while let Some(od) = operations.next() {
let elapsed_result = match target_chain {
OperationalDataTarget::Source => od.has_conn_delay_elapsed(
&|| self.src_time_latest(),
&|| self.src_max_block_time(),
&|| self.src_latest_height(),
),
OperationalDataTarget::Destination => od.has_conn_delay_elapsed(
&|| self.dst_time_latest(),
&|| self.dst_max_block_time(),
&|| self.dst_latest_height(),
),
};

for od in dst_ods {
let reply =
self.relay_from_operational_data::<relay_sender::AsyncSender>(od.clone())?;
match elapsed_result {
Ok(elapsed) => {
if elapsed {
// The current piece of operational data has elapsed; we can go ahead and
// attempt to relay it.
match self
.relay_from_operational_data::<relay_sender::AsyncSender>(od.clone())
{
// The operational data was successfully relayed; enqueue the associated tx.
Ok(reply) => self.enqueue_pending_tx(reply, od),
adizere marked this conversation as resolved.
Show resolved Hide resolved
// The relaying process failed; return all of the subsequent pieces of operational
// data along with the underlying error that occurred.
Err(e) => {
unprocessed.extend(operations);

return Err((unprocessed, e));
}
}
} else {
// The current piece of operational data has not elapsed; add it to the bucket
// of unprocessed operational data and continue processing subsequent pieces
// of operational data.
unprocessed.push_back(od);
}
}
Err(e) => {
// An error occurred when attempting to determine whether the current piece of
// operational data has elapsed or not. Add the current piece of data, along with
// all of the subsequent pieces of data, to the unprocessed bucket and return it
// along with the error that resulted.
unprocessed.push_back(od);
unprocessed.extend(operations);

return Err((unprocessed, e));
}
}
}

self.enqueue_pending_tx(reply, od);
Ok(unprocessed)
}

/// While there are pending operational data items, this function
/// performs the relaying of packets corresponding to those
/// operational data items to both the source and destination chains.
///
/// Any operational data items that do not get successfully relayed are
/// dropped. Subsequent pending operational data items that went unprocessed
/// are queued up again for re-submission.
seanchen1991 marked this conversation as resolved.
Show resolved Hide resolved
pub fn execute_schedule(&mut self) -> Result<(), LinkError> {
let src_od_iter = self.src_operational_data.take().into_iter();

match self.execute_schedule_for_target_chain(src_od_iter, OperationalDataTarget::Source) {
Ok(unprocessed_src_data) => self.src_operational_data = unprocessed_src_data.into(),
Err((unprocessed_src_data, e)) => {
self.src_operational_data = unprocessed_src_data.into();
return Err(e);
}
}

for od in src_ods {
let reply =
self.relay_from_operational_data::<relay_sender::AsyncSender>(od.clone())?;
self.enqueue_pending_tx(reply, od);
let dst_od_iter = self.dst_operational_data.take().into_iter();

match self
.execute_schedule_for_target_chain(dst_od_iter, OperationalDataTarget::Destination)
{
Ok(unprocessed_dst_data) => self.dst_operational_data = unprocessed_dst_data.into(),
Err((unprocessed_dst_data, e)) => {
self.dst_operational_data = unprocessed_dst_data.into();
return Err(e);
}
}

Ok(())
Expand Down
6 changes: 6 additions & 0 deletions relayer/src/util/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,3 +63,9 @@ impl<T> Default for Queue<T> {
Self::new()
}
}

impl<T> From<VecDeque<T>> for Queue<T> {
fn from(deque: VecDeque<T>) -> Self {
Queue(Arc::new(RwLock::new(deque)))
}
}
6 changes: 3 additions & 3 deletions relayer/src/worker/packet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ pub fn spawn_packet_worker<ChainA: ChainHandle, ChainB: ChainHandle>(
};

spawn_background_task(span, Some(Duration::from_millis(1000)), move || {
let relay_path = &link.lock().unwrap().a_to_b;
let relay_path = &mut link.lock().unwrap().a_to_b;

relay_path
.refresh_schedule()
Expand Down Expand Up @@ -120,7 +120,7 @@ pub fn spawn_packet_cmd_worker<ChainA: ChainHandle, ChainB: ChainHandle>(
retry_with_index(retry_strategy::worker_stubborn_strategy(), |index| {
handle_packet_cmd(
&mut is_first_run,
&link.lock().unwrap(),
&mut link.lock().unwrap(),
clear_on_start,
clear_interval,
&path,
Expand All @@ -145,7 +145,7 @@ pub fn spawn_packet_cmd_worker<ChainA: ChainHandle, ChainB: ChainHandle>(
/// data that is ready.
fn handle_packet_cmd<ChainA: ChainHandle, ChainB: ChainHandle>(
is_first_run: &mut bool,
link: &Link<ChainA, ChainB>,
link: &mut Link<ChainA, ChainB>,
clear_on_start: bool,
clear_interval: u64,
path: &Packet,
Expand Down
92 changes: 92 additions & 0 deletions tools/integration-test/src/tests/execute_schedule.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
//! This test ensures that the `RelayPath::execute_schedule` method does not
//! drop any scheduled `OperationalData` when events associated with a prior
//! piece of operational data fails to send. Subsequent pieces of operational
//! data that were scheduled should be re-queued and not dropped.
//!
//! In order to test this behavior, the test manually relays a batch (i.e. at least
//! 2) IBC transfers from chain A to chain B. Chain B is then shut down in order to
//! force the batch of messages (in the form of their associated pieces of operational
//! data) to be queued up again for re-submission.
//!
//! It is expected that the first message of the batch gets dropped (i.e. it is not
//! later found in the pending queue), but all of the subsequent messages should
//! exist in the pending queue.

use ibc_test_framework::prelude::*;
use ibc_test_framework::util::random::random_u64_range;

use ibc_relayer::link::{Link, LinkParameters};

/// The number of messages to be sent in a batch contained in a piece of operational data.
const BATCH_SIZE: usize = 10;

#[test]
fn test_execute_schedule() -> Result<(), Error> {
run_binary_channel_test(&ExecuteScheduleTest)
}

pub struct ExecuteScheduleTest;

impl TestOverrides for ExecuteScheduleTest {
fn should_spawn_supervisor(&self) -> bool {
false
}
}

impl BinaryChannelTest for ExecuteScheduleTest {
fn run<ChainA: ChainHandle, ChainB: ChainHandle>(
&self,
_config: &TestConfig,
_relayer: RelayerDriver,
chains: ConnectedChains<ChainA, ChainB>,
channel: ConnectedChannel<ChainA, ChainB>,
) -> Result<(), Error> {
let amount1 = random_u64_range(1000, 5000);

let chain_a_link_opts = LinkParameters {
src_port_id: channel.port_a.clone().into_value(),
src_channel_id: channel.channel_id_a.clone().into_value(),
};

let chain_a_link = Link::new_from_opts(
chains.handle_a().clone(),
chains.handle_b().clone(),
chain_a_link_opts,
true,
)?;

let mut relay_path_a_to_b = chain_a_link.a_to_b;

// Construct `BATCH_SIZE` pieces of operational data and queue them up to be sent to chain B.
for i in 0..BATCH_SIZE {
chains.node_a.chain_driver().ibc_transfer_token(
&channel.port_a.as_ref(),
&channel.channel_id_a.as_ref(),
&chains.node_a.wallets().user1(),
&chains.node_b.wallets().user1().address(),
&chains.node_a.denom(),
amount1,
)?;

relay_path_a_to_b.schedule_packet_clearing(None)?;
seanchen1991 marked this conversation as resolved.
Show resolved Hide resolved

info!("Performing IBC send packet with a token transfer #{} from chain A to be received by chain B", i);
}

// We should see that all of the events in the batch are queued up to be sent to chain B.
assert_eq!(relay_path_a_to_b.dst_operational_data.len(), BATCH_SIZE);

chains.node_b.value().kill()?;

// With chain B inactive, if we attempt to send the batch of messages, we expect to see
// `BATCH_SIZE` - 1 messages in the batch since the initial event should have failed to
// be relayed and was thus dropped. The subsequent messages in the batch should have all
// been re-added to the pending queue.
match relay_path_a_to_b.execute_schedule() {
Ok(_) => panic!("Expected an error when relaying tx from A to B"),
Err(_) => assert_eq!(relay_path_a_to_b.dst_operational_data.len(), BATCH_SIZE - 1),
}

Ok(())
}
}
1 change: 1 addition & 0 deletions tools/integration-test/src/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ pub mod clear_packet;
pub mod client_expiration;
mod client_settings;
pub mod connection_delay;
pub mod execute_schedule;
pub mod memo;
pub mod python;
mod query_packet;
Expand Down