-
Notifications
You must be signed in to change notification settings - Fork 624
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: properly stop prefetching background threads #7712
Conversation
Explicitly stop and wait for prefetching background threads to terminate when the `ShardTriesInner` is dropped. This avoids that estimations are influenced by background threads left over from previous estimations, which we have observed since merging near#7661.
replaces #7689 |
@@ -334,7 +337,7 @@ impl PrefetchStagingArea { | |||
Some(_) => (), | |||
None => return None, | |||
} | |||
std::thread::sleep(std::time::Duration::from_micros(1)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unreleated to the PR, but this one here looks quite suspect. Should this have a Condvar instead of a busy-wait here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah from a logical point it almost certainly should. But I suspect it would also make code quite a bit more complex.
To elaborate a bit, we are waiting for one of two events to happen here:
- An IO background thread puts the value into the staging area.
- A main thread puts the value in the shard cache.
Both these options are possible regardless of whether blocking_get
got called from a main thread or from a background thread.
The case where we are a main thread and another main thread puts it in the shard cache is also not handled properly, yet. Let me create an issue for cleaning that up...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
#[must_use = "When dropping this handle, the IO threads will be aborted immediately."] | ||
pub(crate) struct PrefetchingThreadsHandle { | ||
/// Shutdown channels to all spawned threads. | ||
shutdown_channels: Vec<crossbeam::channel::Sender<()>>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One channel should be enough here, Receiver: Clone
for tx in &self.shutdown_channels { | ||
let e = tx.send(()); | ||
if e.is_err() { | ||
// Usually senders are dropped after joining all background threads. | ||
// But if this order is reversed, this send here will fail. This | ||
// is perfectly valid behavior and should not be treated as error. | ||
debug!("IO thread already hung up when trying to shut it down."); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for tx in &self.shutdown_channels { | |
let e = tx.send(()); | |
if e.is_err() { | |
// Usually senders are dropped after joining all background threads. | |
// But if this order is reversed, this send here will fail. This | |
// is perfectly valid behavior and should not be treated as error. | |
debug!("IO thread already hung up when trying to shut it down."); | |
} | |
} | |
self.shutdown_channels.clear() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That is, closing a channel gives the signal we want here, we don't need special code to handle this.
let (shutdown_tx, shutdown_rx) = crossbeam::channel::bounded(1); | ||
let handle = thread::spawn(move || { | ||
loop { | ||
select! { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not a huge fan of putting ladge chunks of code into a select. I'd write htis as
loop {
let next_item = select! {}
match next_item { None => return, Some(it) => process }
}
Explicitly stop and wait for prefetching background threads to terminate when the `ShardTriesInner` is dropped. This avoids that estimations are influenced by background threads left over from previous estimations, which we have observed since merging #7661.
Explicitly stop and wait for prefetching background threads to terminate when the
ShardTriesInner
is dropped.This avoids that estimations are influenced by background threads left
over from previous estimations, which we have observed since merging
#7661.