Skip to content

Commit

Permalink
Split packet clearing schedule from packet worker (#4080)
Browse files Browse the repository at this point in the history
* Split packet clearing schedule from packet worker

* Rename packet clearing test to be more specific

* Nit clean-up for ordered_channel test

* Clear packets in packet_cmd_worker for ordered channels only

* Add intermediary channel to forward cmd from clear worker to packet worker

* Add changelog entry

* Add new config 'clear_limit'

* Apply clear_limit when clearing pending packets

* Add test for clear_limit

* Fix clear limit for send packets

* Update changelog entry

* Add 'clear_limit' to example config

* Restore changelog entry deleted after merge

* Apply suggestions from code review

Co-authored-by: Romain Ruetschi <romain@informal.systems>
Signed-off-by: Luca Joss <43531661+ljoss17@users.noreply.github.com>

* Moved 'clear_limit' after 'clear_on_start' in example config.toml

---------

Signed-off-by: Luca Joss <43531661+ljoss17@users.noreply.github.com>
Co-authored-by: Romain Ruetschi <romain@informal.systems>
  • Loading branch information
ljoss17 and romac authored Aug 6, 2024
1 parent f49a843 commit f9b0194
Show file tree
Hide file tree
Showing 10 changed files with 293 additions and 53 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
- Add a new configuration `clear_limit` to specify the maximum number
of packets cleared every time packet clearing is triggered.
Defaults to 50.
([\#4071](https://github.com/informalsystems/hermes/issues/4071))
4 changes: 4 additions & 0 deletions config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ clear_interval = 100
# Whether or not to clear packets on start. [Default: true]
clear_on_start = true

# Set the maximum number of packets to clear each time packet clearing is triggered.
# [Default: 50]
#clear_limit = 50

# Toggle the transaction confirmation mechanism.
# The tx confirmation mechanism periodically queries the `/tx_search` RPC
# endpoint to check that previously-submitted transactions
Expand Down
7 changes: 7 additions & 0 deletions crates/relayer/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,10 @@ pub mod default {
pub fn allow_ccq() -> bool {
true
}

pub fn clear_limit() -> usize {
50
}
}

#[derive(Clone, Debug, Default, Deserialize, Serialize)]
Expand Down Expand Up @@ -424,6 +428,8 @@ pub struct Packets {
pub ics20_max_memo_size: Ics20FieldSizeLimit,
#[serde(default = "default::ics20_max_receiver_size")]
pub ics20_max_receiver_size: Ics20FieldSizeLimit,
#[serde(default = "default::clear_limit")]
pub clear_limit: usize,

#[serde(skip)]
pub force_disable_clear_on_start: bool,
Expand All @@ -439,6 +445,7 @@ impl Default for Packets {
auto_register_counterparty_payee: default::auto_register_counterparty_payee(),
ics20_max_memo_size: default::ics20_max_memo_size(),
ics20_max_receiver_size: default::ics20_max_receiver_size(),
clear_limit: default::clear_limit(),
force_disable_clear_on_start: false,
}
}
Expand Down
39 changes: 29 additions & 10 deletions crates/relayer/src/link/relay_path.rs
Original file line number Diff line number Diff line change
Expand Up @@ -429,7 +429,11 @@ impl<ChainA: ChainHandle, ChainB: ChainHandle> RelayPath<ChainA, ChainB> {
TrackedEvents::new(result, tracking_id)
}

fn relay_pending_packets(&self, height: Option<Height>) -> Result<(), LinkError> {
fn relay_pending_packets(
&self,
height: Option<Height>,
clear_limit: usize,
) -> Result<(), LinkError> {
let _span = span!(Level::ERROR, "relay_pending_packets", ?height).entered();

let tracking_id = TrackingId::new_packet_clearing();
Expand All @@ -439,10 +443,15 @@ impl<ChainA: ChainHandle, ChainB: ChainHandle> RelayPath<ChainA, ChainB> {
let chunk_size = src_config.query_packets_chunk_size();

for i in 1..=MAX_RETRIES {
let cleared_recv =
self.schedule_recv_packet_and_timeout_msgs(height, chunk_size, tracking_id);
let cleared_recv = self.schedule_recv_packet_and_timeout_msgs(
height,
chunk_size,
clear_limit,
tracking_id,
);

let cleared_ack = self.schedule_packet_ack_msgs(height, chunk_size, tracking_id);
let cleared_ack =
self.schedule_packet_ack_msgs(height, chunk_size, clear_limit, tracking_id);

match cleared_recv.and(cleared_ack) {
Ok(()) => return Ok(()),
Expand All @@ -458,14 +467,18 @@ impl<ChainA: ChainHandle, ChainB: ChainHandle> RelayPath<ChainA, ChainB> {

/// Clears any packets that were sent before `height`.
/// If no height is passed in, then the latest height of the source chain is used.
pub fn schedule_packet_clearing(&self, height: Option<Height>) -> Result<(), LinkError> {
pub fn schedule_packet_clearing(
&self,
height: Option<Height>,
clear_limit: usize,
) -> Result<(), LinkError> {
let _span = span!(Level::ERROR, "schedule_packet_clearing", ?height).entered();

let clear_height = height
.map(|h| h.decrement().map_err(|e| LinkError::decrement_height(h, e)))
.transpose()?;

self.relay_pending_packets(clear_height)?;
self.relay_pending_packets(clear_height, clear_limit)?;

debug!(height = ?clear_height, "done relaying pending packets at clear height");

Expand Down Expand Up @@ -1139,6 +1152,7 @@ impl<ChainA: ChainHandle, ChainB: ChainHandle> RelayPath<ChainA, ChainB> {
&self,
opt_query_height: Option<Height>,
chunk_size: usize,
clear_limit: usize,
tracking_id: TrackingId,
) -> Result<(), LinkError> {
let _span = span!(
Expand All @@ -1161,11 +1175,13 @@ impl<ChainA: ChainHandle, ChainB: ChainHandle> RelayPath<ChainA, ChainB> {
}

// Retain only sequences which should not be filtered out
let sequences: Vec<Sequence> = sequences
let raw_sequences: Vec<Sequence> = sequences
.into_iter()
.filter(|sequence| !self.exclude_src_sequences.contains(sequence))
.collect();

let sequences = &raw_sequences[..raw_sequences.len().min(clear_limit)];

debug!(
dst_chain = %self.dst_chain().id(),
src_chain = %self.src_chain().id(),
Expand All @@ -1177,7 +1193,7 @@ impl<ChainA: ChainHandle, ChainB: ChainHandle> RelayPath<ChainA, ChainB> {
// Chunk-up the list of sequence nrs. into smaller parts,
// and schedule operational data incrementally across each chunk.
for events_chunk in query_packet_events_with(
&sequences,
sequences,
Qualified::SmallerEqual(query_height),
self.src_chain(),
&self.path_id,
Expand Down Expand Up @@ -1206,6 +1222,7 @@ impl<ChainA: ChainHandle, ChainB: ChainHandle> RelayPath<ChainA, ChainB> {
&self,
opt_query_height: Option<Height>,
chunk_size: usize,
clear_limit: usize,
tracking_id: TrackingId,
) -> Result<(), LinkError> {
let _span = span!(
Expand All @@ -1231,11 +1248,13 @@ impl<ChainA: ChainHandle, ChainB: ChainHandle> RelayPath<ChainA, ChainB> {
}

// Retain only sequences which should not be filtered out
let sequences: Vec<Sequence> = sequences
let raw_sequences: Vec<Sequence> = sequences
.into_iter()
.filter(|sequence| !self.exclude_src_sequences.contains(sequence))
.collect();

let sequences = &raw_sequences[..raw_sequences.len().min(clear_limit)];

debug!(
dst_chain = %self.dst_chain().id(),
src_chain = %self.src_chain().id(),
Expand All @@ -1246,7 +1265,7 @@ impl<ChainA: ChainHandle, ChainB: ChainHandle> RelayPath<ChainA, ChainB> {

// Incrementally process all the available sequence numbers in chunks
for events_chunk in query_packet_events_with(
&sequences,
sequences,
Qualified::SmallerEqual(query_height),
self.src_chain(),
&self.path_id,
Expand Down
16 changes: 14 additions & 2 deletions crates/relayer/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,19 +169,31 @@ pub fn spawn_worker_tasks<ChainA: ChainHandle, ChainB: ChainHandle>(

let resubmit = Resubmit::from_clear_interval(clear_interval);

let (clear_cmd_tx, clear_cmd_rx) = crossbeam_channel::unbounded();
let clear_task = packet::spawn_clear_cmd_worker(
cmd_rx,
link.clone(),
should_clear_on_start,
clear_interval,
config.mode.packets.clear_limit,
clear_cmd_tx,
);
task_handles.push(clear_task);

// Only spawn the incentivized worker if a fee filter is specified in the configuration
let packet_task = match fee_filter {
Some(filter) => packet::spawn_incentivized_packet_cmd_worker(
cmd_rx,
clear_cmd_rx,
link.clone(),
path.clone(),
filter,
),
None => packet::spawn_packet_cmd_worker(
cmd_rx,
clear_cmd_rx,
link.clone(),
should_clear_on_start,
clear_interval,
config.mode.packets.clear_limit,
path.clone(),
),
};
Expand Down
Loading

0 comments on commit f9b0194

Please sign in to comment.