Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix execute_schedule method leaking operational data #2118

Merged
merged 43 commits into from
May 27, 2022
Merged
Show file tree
Hide file tree
Changes from 37 commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
042480e
Pull in upstream changes
seanchen1991 Apr 22, 2022
69a1f8f
Merge branch 'informalsystems:master' into master
seanchen1991 Apr 22, 2022
83b60d7
Adding integration test for execute_schedule
seanchen1991 Apr 22, 2022
60edac7
Adding integration test for execute_schedule
seanchen1991 Apr 22, 2022
5b26c2f
Finish stubbing out execute_schedule test
seanchen1991 Apr 22, 2022
37130a3
Call `chains.shutdown` method
seanchen1991 Apr 25, 2022
c032b4f
Correctly shut down chain
seanchen1991 Apr 25, 2022
a561715
Shut down node b as well
seanchen1991 Apr 26, 2022
fe08ba3
Debugging execute_schedule test
seanchen1991 May 9, 2022
37d96b7
Add Debug derivations
seanchen1991 May 10, 2022
e58a6e1
Merge branch 'master' of https://github.com/informalsystems/ibc-rs in…
seanchen1991 May 10, 2022
a1267f4
Update integration test so that its flow correctly tests `execute_sch…
seanchen1991 May 11, 2022
1fc709d
Attempt to perform a second IBC transfer
seanchen1991 May 11, 2022
cfc7c70
Remove info's
seanchen1991 May 11, 2022
4c327b2
Increase sleep timeout duration
seanchen1991 May 11, 2022
40a3ef3
Merge branch 'master' of https://github.com/informalsystems/ibc-rs in…
seanchen1991 May 11, 2022
dbe86ca
Incorportate new test framework features
seanchen1991 May 11, 2022
ac8532d
Remove unnecessary `sleep` call
seanchen1991 May 11, 2022
c3b0858
Correctly use new test framework features
seanchen1991 May 11, 2022
7b50b16
Get assertions passing for now
seanchen1991 May 11, 2022
de2ea25
Send two transactions, one in each direction
seanchen1991 May 13, 2022
543cfa7
Add doc comment for test
seanchen1991 May 13, 2022
8cb0aca
Improve panic messages
seanchen1991 May 13, 2022
3ddc4ef
Refactor test so that it is actually testing the desired behavior
seanchen1991 May 20, 2022
dedce23
Attempt at fixing `execute_schedule` leaky logic
seanchen1991 May 20, 2022
a948c3f
Flesh out doc comments some more
seanchen1991 May 20, 2022
0321f0e
Remove a duplicate function
seanchen1991 May 20, 2022
142feea
Make use of OperationalDataTarget enum
seanchen1991 May 20, 2022
0dec734
Remove redundant enum
seanchen1991 May 20, 2022
ef2014c
Remove some Debug derives
seanchen1991 May 20, 2022
ef3f498
Remove one more debug derive
seanchen1991 May 20, 2022
5afe71e
Merge branch 'master' into execute-schedule-test
seanchen1991 May 20, 2022
e102db1
Merge branch 'master' of https://github.com/informalsystems/ibc-rs in…
seanchen1991 May 20, 2022
8a3ed6b
Merge branch 'execute-schedule-test' of github.com:seanchen1991/ibc-r…
seanchen1991 May 20, 2022
77040d8
Add `try_fetch_scheduled_operational_data` back in
seanchen1991 May 20, 2022
6a7bf0e
Give `do_execute_schedule` a more descriptive name
seanchen1991 May 20, 2022
402b2c0
Improve `execute_schedule_for_target_chain` method's documentation
seanchen1991 May 23, 2022
26f61d0
Add a bunch of clarifying comments
seanchen1991 May 24, 2022
58d78cf
More clarification of comments
seanchen1991 May 24, 2022
746d990
Flesh out `OperationalData` docs
seanchen1991 May 24, 2022
d4342fe
Add changelog entry
seanchen1991 May 24, 2022
b9e2af3
Incorporate PR feedback
seanchen1991 May 26, 2022
e4dedbe
Merge branch 'master' into execute-schedule-test
adizere May 27, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions relayer/src/link/operational_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,12 @@ use crate::chain::tracking::TrackingId;
use crate::link::error::LinkError;
use crate::link::RelayPath;

/// The chain that a piece of `OperationalData` is bound for.
#[derive(Clone, Copy, PartialEq)]
pub enum OperationalDataTarget {
/// The source chain.
seanchen1991 marked this conversation as resolved.
Show resolved Hide resolved
Source,
/// The destination chain.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// The destination chain.
/// The destination chain, i.e., the chain on the receiving side of packets.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The language being proposed for the Source variant vs. the Destination variant has me wondering what the relationship is between packets and operational data; how are the two related? Why is it that the destination chain receives packets, not operational data?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what the relationship is between packets and operational data; how are the two related?

This is also something I've been struggling a lot with. Hope these notes below help. The important concepts are marked in bold:

  • one chain (source) sends packets
    • a user (typically) submits transactions with these kind of packets
  • the other chain (destination) receives packets
    • a relayer submits these kind of packets to the destination chain
    • if no relayer acts in time, then the relayer submits a timeout packet to the source chain (instead of receive packet to destination chain)
  • when a chain processes a send packets, that translates into an event
    • when a chain processes a receive packet, that also translates into an event
    • Hermes acts upon these kind of events reactively
  • Hermes translates events into operational data (a concept internal to Hermes), which encompasses both an event as well as any resulting -- receive or timeout -- packet that Hermes has to submit based on that event
  • so operational data is just an internal representation that Hermes keeps for pairing together an event with the action that that event entails

aside from the main question, but:

  • all packets (send, receive, timeout) are IBC messages
  • beside packets, there are other types of messages: client update messages, notably, and soon relayer fees messages (I think).
  • connection and channel handshakes are also types of IBC messages
  • Hermes typically batches multiple IBC messages into a transaction

so the three primitive data types are:

  • packets, messages, and transactions

I'm thinking we should turn this into a diagram and put in the architecture or docs/ somewhere.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I opted to capture as much of this that is relevant to the OperationalData type in its doc comment. Hopefully it helps clear some of this up a bit.

Destination,
}

Expand Down
121 changes: 102 additions & 19 deletions relayer/src/link/relay_path.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,8 @@ pub struct RelayPath<ChainA: ChainHandle, ChainB: ChainHandle> {
// mostly timeout packet messages.
// The operational data targeting the destination chain
// comprises mostly RecvPacket and Ack msgs.
pub(crate) src_operational_data: Queue<OperationalData>,
pub(crate) dst_operational_data: Queue<OperationalData>,
pub src_operational_data: Queue<OperationalData>,
pub dst_operational_data: Queue<OperationalData>,
seanchen1991 marked this conversation as resolved.
Show resolved Hide resolved

// Toggle for the transaction confirmation mechanism.
confirm_txes: bool,
Expand Down Expand Up @@ -391,7 +391,7 @@ impl<ChainA: ChainHandle, ChainB: ChainHandle> RelayPath<ChainA, ChainB> {
for i in 1..=MAX_RETRIES {
let cleared = self
.schedule_recv_packet_and_timeout_msgs(height, tracking_id)
.and_then(|()| self.schedule_packet_ack_msgs(height, tracking_id));
.and_then(|_| self.schedule_packet_ack_msgs(height, tracking_id));

match cleared {
Ok(()) => return Ok(()),
Expand All @@ -406,6 +406,7 @@ impl<ChainA: ChainHandle, ChainB: ChainHandle> RelayPath<ChainA, ChainB> {
}

/// Clears any packets that were sent before `height`.
/// If no height is passed in, then the latest height of the source chain is used.
pub fn schedule_packet_clearing(&self, height: Option<Height>) -> Result<(), LinkError> {
let span = span!(Level::DEBUG, "clear");
let _enter = span.enter();
Expand Down Expand Up @@ -1278,27 +1279,109 @@ impl<ChainA: ChainHandle, ChainB: ChainHandle> RelayPath<ChainA, ChainB> {
}
}

/// Checks if there are any operational data items ready,
/// and if so performs the relaying of corresponding packets
/// to the target chain.
/// Drives the relaying of elapsed operational data items meant for
/// a specified target chain forward.
///
/// This method performs relaying using the asynchronous sender.
/// Retains the operational data as pending, and associates it
/// with one or more transaction hash(es).
pub fn execute_schedule(&self) -> Result<(), LinkError> {
let (src_ods, dst_ods) = self.try_fetch_scheduled_operational_data()?;
/// Given an iterator of `OperationalData` elements, this function
/// first determines whether the current piece of operational data
/// has elapsed.
///
/// A piece of operational data is considered 'elapsed' if it has surpassed
/// its target chain's:
/// 1. Latest timestamp
/// 2. Maximum block time
/// 3. Latest height
seanchen1991 marked this conversation as resolved.
Show resolved Hide resolved
///
/// If the current piece of operational data has elapsed, then relaying
/// is performed using the asynchronous sender. Operational data is
/// retained as pending and is associated with one or more transaction
/// hash(es).
///
/// Should an error occur when attempting to relay a piece of operational
/// data, this function returns all subsequent unprocessed pieces of
/// operational data back to the caller so that they can be re-queued
/// for processing; the operational data that failed to send is dropped.
///
/// Note that pieces of operational data that have not elapsed yet are
/// also placed in the 'unprocessed' bucket.
fn execute_schedule_for_target_chain<I: Iterator<Item = OperationalData>>(
&mut self,
mut operations: I,
target_chain: OperationalDataTarget,
) -> Result<VecDeque<OperationalData>, (VecDeque<OperationalData>, LinkError)> {
let mut unprocessed = VecDeque::new();

while let Some(od) = operations.next() {
let elapsed_result = match target_chain {
OperationalDataTarget::Source => od.has_conn_delay_elapsed(
&|| self.src_time_latest(),
&|| self.src_max_block_time(),
&|| self.src_latest_height(),
),
OperationalDataTarget::Destination => od.has_conn_delay_elapsed(
&|| self.dst_time_latest(),
&|| self.dst_max_block_time(),
&|| self.dst_latest_height(),
),
};

for od in dst_ods {
let reply =
self.relay_from_operational_data::<relay_sender::AsyncSender>(od.clone())?;
match elapsed_result {
Ok(elapsed) => {
if elapsed {
match self
.relay_from_operational_data::<relay_sender::AsyncSender>(od.clone())
{
Ok(reply) => self.enqueue_pending_tx(reply, od),
adizere marked this conversation as resolved.
Show resolved Hide resolved
Err(e) => {
unprocessed.extend(operations);

self.enqueue_pending_tx(reply, od);
return Err((unprocessed, e));
}
}
} else {
unprocessed.push_back(od);
}
}
Err(e) => {
unprocessed.push_back(od);
unprocessed.extend(operations);

return Err((unprocessed, e));
}
}
}

for od in src_ods {
let reply =
self.relay_from_operational_data::<relay_sender::AsyncSender>(od.clone())?;
self.enqueue_pending_tx(reply, od);
Ok(unprocessed)
}

/// While there are pending operational data items, this function
/// performs the relaying of packets corresponding to those
/// operational data items to both the source and destination chains.
///
/// Any operational data items that do not get successfully relayed are
/// dropped. Subsequent pending operational data items that went unprocessed
/// are queued up again for re-submission.
seanchen1991 marked this conversation as resolved.
Show resolved Hide resolved
pub fn execute_schedule(&mut self) -> Result<(), LinkError> {
let src_od_iter = self.src_operational_data.take().into_iter();

match self.execute_schedule_for_target_chain(src_od_iter, OperationalDataTarget::Source) {
Ok(unprocessed_src_data) => self.src_operational_data = unprocessed_src_data.into(),
Err((unprocessed_src_data, e)) => {
self.src_operational_data = unprocessed_src_data.into();
return Err(e);
}
}

let dst_od_iter = self.dst_operational_data.take().into_iter();

match self
.execute_schedule_for_target_chain(dst_od_iter, OperationalDataTarget::Destination)
{
Ok(unprocessed_dst_data) => self.dst_operational_data = unprocessed_dst_data.into(),
Err((unprocessed_dst_data, e)) => {
self.dst_operational_data = unprocessed_dst_data.into();
return Err(e);
}
}

Ok(())
Expand Down
6 changes: 6 additions & 0 deletions relayer/src/util/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,3 +63,9 @@ impl<T> Default for Queue<T> {
Self::new()
}
}

impl<T> From<VecDeque<T>> for Queue<T> {
fn from(deque: VecDeque<T>) -> Self {
Queue(Arc::new(RwLock::new(deque)))
}
}
6 changes: 3 additions & 3 deletions relayer/src/worker/packet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ pub fn spawn_packet_worker<ChainA: ChainHandle, ChainB: ChainHandle>(
};

spawn_background_task(span, Some(Duration::from_millis(1000)), move || {
let relay_path = &link.lock().unwrap().a_to_b;
let relay_path = &mut link.lock().unwrap().a_to_b;

relay_path
.refresh_schedule()
Expand Down Expand Up @@ -120,7 +120,7 @@ pub fn spawn_packet_cmd_worker<ChainA: ChainHandle, ChainB: ChainHandle>(
retry_with_index(retry_strategy::worker_stubborn_strategy(), |index| {
handle_packet_cmd(
&mut is_first_run,
&link.lock().unwrap(),
&mut link.lock().unwrap(),
clear_on_start,
clear_interval,
&path,
Expand All @@ -145,7 +145,7 @@ pub fn spawn_packet_cmd_worker<ChainA: ChainHandle, ChainB: ChainHandle>(
/// data that is ready.
fn handle_packet_cmd<ChainA: ChainHandle, ChainB: ChainHandle>(
is_first_run: &mut bool,
link: &Link<ChainA, ChainB>,
link: &mut Link<ChainA, ChainB>,
clear_on_start: bool,
clear_interval: u64,
path: &Packet,
Expand Down
82 changes: 82 additions & 0 deletions tools/integration-test/src/tests/execute_schedule.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
//! This test ensures that the `RelayPath::execute_schedule` method does not
//! drop any pending scheduled operational data when a prior transaction fails
//! to send. Subsequent pieces of operational data that were scheduled should
//! be re-queued and not dropped.
//!
//! In order to test this behavior, the test manually relays at least 2 IBC transfers
//! from chain A to chain B. Chain B is then shut down in order to force the transactions
//! to be queued up again for re-submission. It is expected that the first transfer
//! be dropped and not be present in the pending queue, but all of the subsequent
seanchen1991 marked this conversation as resolved.
Show resolved Hide resolved
//! transactions should exist in the pending queue.

use ibc_test_framework::prelude::*;
use ibc_test_framework::util::random::random_u64_range;

use ibc_relayer::link::{Link, LinkParameters};

const NUM_TXS: usize = 10;

#[test]
fn test_execute_schedule() -> Result<(), Error> {
run_binary_channel_test(&ExecuteScheduleTest)
}

pub struct ExecuteScheduleTest;

impl TestOverrides for ExecuteScheduleTest {
fn should_spawn_supervisor(&self) -> bool {
false
}
}

impl BinaryChannelTest for ExecuteScheduleTest {
fn run<ChainA: ChainHandle, ChainB: ChainHandle>(
&self,
_config: &TestConfig,
_relayer: RelayerDriver,
chains: ConnectedChains<ChainA, ChainB>,
channel: ConnectedChannel<ChainA, ChainB>,
) -> Result<(), Error> {
let amount1 = random_u64_range(1000, 5000);

let chain_a_link_opts = LinkParameters {
src_port_id: channel.port_a.clone().into_value(),
src_channel_id: channel.channel_id_a.clone().into_value(),
};

let chain_a_link = Link::new_from_opts(
chains.handle_a().clone(),
chains.handle_b().clone(),
chain_a_link_opts,
true,
)?;

let mut relay_path_a_to_b = chain_a_link.a_to_b;

for i in 0..NUM_TXS {
chains.node_a.chain_driver().ibc_transfer_token(
&channel.port_a.as_ref(),
&channel.channel_id_a.as_ref(),
&chains.node_a.wallets().user1(),
&chains.node_b.wallets().user1().address(),
&chains.node_a.denom(),
amount1,
)?;

relay_path_a_to_b.schedule_packet_clearing(None)?;
seanchen1991 marked this conversation as resolved.
Show resolved Hide resolved

info!("Performing IBC transfer #{} from chain A to chain B", i);
seanchen1991 marked this conversation as resolved.
Show resolved Hide resolved
}

assert_eq!(relay_path_a_to_b.dst_operational_data.len(), NUM_TXS);

chains.node_b.value().kill()?;

match relay_path_a_to_b.execute_schedule() {
Ok(_) => panic!("Expected an error when relaying tx from A to B"),
Err(_) => assert_eq!(relay_path_a_to_b.dst_operational_data.len(), NUM_TXS - 1),
seanchen1991 marked this conversation as resolved.
Show resolved Hide resolved
}

Ok(())
}
}
1 change: 1 addition & 0 deletions tools/integration-test/src/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ pub mod clear_packet;
pub mod client_expiration;
mod client_settings;
pub mod connection_delay;
pub mod execute_schedule;
pub mod memo;
pub mod python;
mod query_packet;
Expand Down