Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce redundant flush wakes #933

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 14 additions & 7 deletions async-nats/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,8 @@ pub(crate) struct ConnectionHandler {
max_pings: usize,
info_sender: tokio::sync::watch::Sender<ServerInfo>,
ping_interval: Interval,
flush_interval: Interval,
flush_period: Duration,
flush_wanted: bool,
}

impl ConnectionHandler {
Expand All @@ -318,9 +319,6 @@ impl ConnectionHandler {
let mut ping_interval = interval(ping_period);
ping_interval.set_missed_tick_behavior(MissedTickBehavior::Delay);

let mut flush_interval = interval(flush_period);
flush_interval.set_missed_tick_behavior(MissedTickBehavior::Skip);

ConnectionHandler {
connection,
connector,
Expand All @@ -329,7 +327,8 @@ impl ConnectionHandler {
max_pings: 2,
info_sender,
ping_interval,
flush_interval,
flush_period,
flush_wanted: false,
}
}

Expand All @@ -338,6 +337,12 @@ impl ConnectionHandler {
mut receiver: mpsc::Receiver<Command>,
) -> Result<(), io::Error> {
loop {
let flush_sleep = if self.flush_wanted {
tokio::time::sleep(self.flush_period).boxed()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will create a lot of allocator noise. What if we made the Interval very high (a few days long) when we don't want any flushes, and lowered it back to the configured value when we want it back?

Copy link
Contributor

@paolobarbolini paolobarbolini Apr 25, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Forget that. There's no way to change an Interval's interval. I wander if we ignored the first tick, if it would stop waking the runtime up.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or we could make our own Interval wrapper around Sleep

This function can be called both before and after the future has completed.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will create a lot of allocator noise.
Aye, but we can take it out again with a little bit of work.

Or we could make our own Interval wrapper around Sleep

Yeah short rundown is that we'd want to return pending when flush not wanted, store the waker, wake up when flush wanted is.

} else {
std::future::pending().boxed()
};

select! {
_ = self.ping_interval.tick().fuse() => {
self.pending_pings += 1;
Expand All @@ -348,11 +353,12 @@ impl ConnectionHandler {
self.handle_flush().await?;

},
_ = self.flush_interval.tick().fuse() => {
_ = flush_sleep.fuse() => {
if let Err(_err) = self.handle_flush().await {
self.handle_disconnect().await?;
}
},

maybe_command = receiver.recv().fuse() => {
match maybe_command {
Some(command) => if let Err(err) = self.handle_command(command).await {
Expand Down Expand Up @@ -480,13 +486,14 @@ impl ConnectionHandler {

async fn handle_flush(&mut self) -> Result<(), io::Error> {
self.connection.flush().await?;
self.flush_interval.reset();
self.flush_wanted = false;

Ok(())
}

async fn handle_command(&mut self, command: Command) -> Result<(), io::Error> {
self.ping_interval.reset();
self.flush_wanted = true;

match command {
Command::Unsubscribe { sid, max } => {
Expand Down