Skip to content

Commit

Permalink
fix: next may called on stream even after the stream has finished
Browse files Browse the repository at this point in the history
  • Loading branch information
SoumyaRanjanPatnaik committed Feb 11, 2024
1 parent c36ce61 commit 5151aa9
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 19 deletions.
5 changes: 2 additions & 3 deletions src/formatting/scheduling.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,8 @@ pub fn manage_widgets_updates() -> (UnboundedSender<(usize, Vec<u64>)>, BoxedStr
}
}
},
)
.boxed();
(intervals_tx, stream)
);
(intervals_tx, Box::pin(stream.fuse()))
}

fn single_block_next_update(intervals: &[u64], time: u64, last_update: u64) -> u64 {
Expand Down
5 changes: 2 additions & 3 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,7 @@ use std::pin::Pin;
use std::sync::Arc;
use std::time::Duration;

use futures::stream::{FuturesUnordered, StreamExt};
use futures::Stream;
use futures::stream::{FusedStream, FuturesUnordered, StreamExt};
use once_cell::sync::Lazy;
use tokio::process::Command;
use tokio::sync::{mpsc, Notify};
Expand Down Expand Up @@ -70,7 +69,7 @@ static REQWEST_CLIENT_IPV4: Lazy<reqwest::Client> = Lazy::new(|| {

type BoxedFuture<T> = Pin<Box<dyn Future<Output = T>>>;

type BoxedStream<T> = Pin<Box<dyn Stream<Item = T>>>;
type BoxedStream<T> = Pin<Box<dyn FusedStream<Item = T>>>;

type WidgetUpdatesSender = mpsc::UnboundedSender<(usize, Vec<u64>)>;

Expand Down
13 changes: 7 additions & 6 deletions src/protocol/i3bar_event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ fn unprocessed_events_stream(invert_scrolling: bool) -> BoxedStream<I3BarEvent>
let stdin = unsafe { File::from_raw_fd(0) };
let lines = BufReader::new(stdin).lines();

futures::stream::unfold(lines, move |mut lines| async move {
let stream = futures::stream::unfold(lines, move |mut lines| async move {
loop {
// Take only the valid JSON object between curly braces (cut off leading bracket, commas and whitespace)
let line = lines.next_line().await.ok().flatten()?;
Expand Down Expand Up @@ -69,16 +69,17 @@ fn unprocessed_events_stream(invert_scrolling: bool) -> BoxedStream<I3BarEvent>

break Some((event, lines));
}
})
.boxed_local()
});
Box::pin(stream.fuse())

}

pub fn events_stream(
invert_scrolling: bool,
double_click_delay: Duration,
) -> BoxedStream<I3BarEvent> {
let events = unprocessed_events_stream(invert_scrolling);
futures::stream::unfold((events, None), move |(mut events, pending)| async move {
let stream = futures::stream::unfold((events, None), move |(mut events, pending)| async move {
if let Some(pending) = pending {
return Some((pending, (events, None)));
}
Expand All @@ -98,6 +99,6 @@ pub fn events_stream(
}

Some((event, (events, None)))
})
.boxed_local()
});
Box::pin(stream.fuse())
}
13 changes: 6 additions & 7 deletions src/signals.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,10 @@ pub enum Signal {
pub fn signals_stream() -> BoxedStream<Signal> {
let (sigmin, sigmax) = (SIGRTMIN(), SIGRTMAX());
let signals = Signals::new((sigmin..sigmax).chain([SIGUSR1, SIGUSR2])).unwrap();
signals
.map(move |signal| match signal {
SIGUSR1 => Signal::Usr1,
SIGUSR2 => Signal::Usr2,
x => Signal::Custom(x - sigmin),
})
.boxed()
let stream = signals.map(move |signal| match signal {
SIGUSR1 => Signal::Usr1,
SIGUSR2 => Signal::Usr2,
x => Signal::Custom(x - sigmin),
});
Box::pin(stream.fuse())
}

0 comments on commit 5151aa9

Please sign in to comment.