Skip to content

Commit

Permalink
Cleanup process_wait_queue
Browse files Browse the repository at this point in the history
  • Loading branch information
marc-casperlabs committed Jan 22, 2024
1 parent 601a3c8 commit a78a7fd
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 30 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Fixed

* The IO layer will no longer drop frames if no multi-frame payloads are sent while a non-multi-frame payload has been moved to the wait queue due to exceeding the in-flight request limit.
* The outgoing request queue will now process much faster when filled with large amounts of requests.

## [0.2.0] - 2023-11-24

Expand Down
42 changes: 12 additions & 30 deletions src/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -792,9 +792,6 @@ where
// Once the scheduled frame is processed, we will finished the multi-frame
// transfer, so we can allow for the next multi-frame transfer to be scheduled.
self.active_multi_frame[about_to_finish.channel().get() as usize] = None;

// Ensure the next multi-frame message gets scheduled, if available.
self.process_wait_queue(about_to_finish.channel())?;
}
}
}
Expand All @@ -805,43 +802,28 @@ where

/// Process the wait queue of a given channel, promoting messages that are ready to be sent.
fn process_wait_queue(&mut self, channel: ChannelId) -> Result<(), LocalProtocolViolation> {
let chan = &mut self.wait_queue[channel.get() as usize];
let mut max_items_to_check = chan.len();
#[cfg(feature = "tracing")]
tracing::trace!(%channel, "processing dirty channel");

// The code below is not as bad it looks complexity wise, anticipating two common cases:
//
// 1. A multi-frame read has finished, with capacity for requests to spare. Only
// multi-frame requests will be waiting in the wait queue, so we will likely pop the
// first item, only scanning the rest once.
// 2. One or more requests finished, so we also have a high chance of picking the first
// few requests out of the queue.
let mut ready = Vec::new();

while let Some(item) = chan.pop_front() {
let mut remaining = self.wait_queue[channel.get() as usize].len();

while let Some(item) = self.wait_queue[channel.get() as usize].pop_front() {
if item_should_wait(&item, &self.juliet, &self.active_multi_frame)?.is_some() {
#[cfg(feature = "tracing")]
tracing::trace!(%item, "still waiting");
// Put it right back into the queue.
chan.push_back(item);
self.wait_queue[channel.get() as usize].push_back(item);
} else {
#[cfg(feature = "tracing")]
tracing::debug!(%item, "became ready");
ready.push(item);
self.send_to_ready_queue(item)?;

// No need to look further if we have saturated the channel.
if !self.juliet.allowed_to_send_request(channel)? {
break;
}
}

// Ensure we do not loop endlessly if we cannot find anything.
max_items_to_check -= 1;
if max_items_to_check == 0 {
remaining -= 1;
if remaining == 0 {
break;
}
}

for item in ready {
self.send_to_ready_queue(item)?;
}

Ok(())
}
}
Expand Down

0 comments on commit a78a7fd

Please sign in to comment.