Skip to content

Commit

Permalink
avoid creating Delay multiple times
Browse files Browse the repository at this point in the history
  • Loading branch information
Anthony Mikhailov committed May 4, 2024
1 parent dac445c commit 72a193a
Showing 1 changed file with 25 additions and 19 deletions.
44 changes: 25 additions & 19 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ pub struct ChunksTimeout<St: Stream> {
cap: usize,
// https://github.com/rust-lang-nursery/futures-rs/issues/1475
clock: Option<Delay>,
clock_used: bool,
duration: Duration,
}

Expand All @@ -74,6 +75,7 @@ where
St: Stream,
{
unsafe_unpinned!(items: Vec<St::Item>);
unsafe_unpinned!(clock_used: bool);
unsafe_pinned!(clock: Option<Delay>);
unsafe_pinned!(stream: Fuse<St>);

Expand All @@ -85,6 +87,7 @@ where
items: Vec::with_capacity(capacity),
cap: capacity,
clock: None,
clock_used: false,
duration,
}
}
Expand Down Expand Up @@ -131,6 +134,9 @@ impl<St: Stream> Stream for ChunksTimeout<St> {
type Item = Vec<St::Item>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let duration = self.duration;
let make_clock = move || Delay::new(duration);

loop {
match self.as_mut().stream().poll_next(cx) {
Poll::Ready(item) => match item {
Expand All @@ -139,11 +145,15 @@ impl<St: Stream> Stream for ChunksTimeout<St> {
// the full one.
Some(item) => {
if self.items.is_empty() {
*self.as_mut().clock() = Some(Delay::new(self.duration));
*self.as_mut().clock_used() = true;
self.as_mut()
.clock()
.get_or_insert_with(make_clock)
.reset(duration);
}
self.as_mut().items().push(item);
if self.items.len() >= self.cap {
*self.as_mut().clock() = None;
*self.as_mut().clock_used() = false;
return Poll::Ready(Some(self.as_mut().take()));
} else {
// Continue the loop
Expand All @@ -168,26 +178,22 @@ impl<St: Stream> Stream for ChunksTimeout<St> {
Poll::Pending => {}
}

match self
.as_mut()
.clock()
.as_pin_mut()
.map(|clock| clock.poll(cx))
{
Some(Poll::Ready(())) => {
*self.as_mut().clock() = None;
if !self.clock_used {
debug_assert!(
self.items().is_empty(),
"Inner buffer is empty, but clock is available."
);
return Poll::Pending;
}

let clock = self.as_mut().clock().get_mut().get_or_insert_with(make_clock);
match Pin::new(clock).poll(cx) {
Poll::Ready(()) => {
*self.as_mut().clock_used() = false;
return Poll::Ready(Some(self.as_mut().take()));
}
Some(Poll::Pending) => {}
None => {
debug_assert!(
self.items().is_empty(),
"Inner buffer is empty, but clock is available."
);
}
Poll::Pending => return Poll::Pending,
}

return Poll::Pending;
}
}

Expand Down

0 comments on commit 72a193a

Please sign in to comment.