Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[cp 1.23] fix deadlock on nested DropHelper #15325

Merged
merged 1 commit into from
Nov 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions crates/aptos-drop-helper/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,6 @@ aptos-metrics-core = { workspace = true }
derive_more = { workspace = true }
once_cell = { workspace = true }
threadpool = { workspace = true }

[dev-dependencies]
rayon = { workspace = true }
59 changes: 53 additions & 6 deletions crates/aptos-drop-helper/src/async_concurrent_dropper.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
// Copyright © Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

use crate::metrics::{GAUGE, TIMER};
use crate::{
metrics::{GAUGE, TIMER},
IN_ANY_DROP_POOL,
};
use aptos_infallible::Mutex;
use aptos_metrics_core::{IntGaugeHelper, TimerHelper};
use std::sync::{
Expand Down Expand Up @@ -42,12 +45,25 @@ impl AsyncConcurrentDropper {
rx
}

pub fn max_tasks(&self) -> usize {
self.num_tasks_tracker.max_tasks
}

pub fn num_threads(&self) -> usize {
self.thread_pool.max_count()
}

pub fn wait_for_backlog_drop(&self, no_more_than: usize) {
let _timer = TIMER.timer_with(&[self.name, "wait_for_backlog_drop"]);
self.num_tasks_tracker.wait_for_backlog_drop(no_more_than);
}

fn schedule_drop_impl<V: Send + 'static>(&self, v: V, notif_sender_opt: Option<Sender<()>>) {
if IN_ANY_DROP_POOL.get() {
Self::do_drop(v, notif_sender_opt);
return;
}

let _timer = TIMER.timer_with(&[self.name, "enqueue_drop"]);
self.num_tasks_tracker.inc();

Expand All @@ -57,15 +73,23 @@ impl AsyncConcurrentDropper {
self.thread_pool.execute(move || {
let _timer = TIMER.timer_with(&[name, "real_drop"]);

drop(v);
IN_ANY_DROP_POOL.with(|flag| {
flag.set(true);
});

if let Some(sender) = notif_sender_opt {
sender.send(()).ok();
}
Self::do_drop(v, notif_sender_opt);

num_tasks_tracker.dec();
})
}

fn do_drop<V: Send + 'static>(v: V, notif_sender_opt: Option<Sender<()>>) {
drop(v);

if let Some(sender) = notif_sender_opt {
sender.send(()).ok();
}
}
}

struct NumTasksTracker {
Expand Down Expand Up @@ -111,10 +135,12 @@ impl NumTasksTracker {

#[cfg(test)]
mod tests {
use crate::AsyncConcurrentDropper;
use crate::{AsyncConcurrentDropper, DropHelper, DEFAULT_DROPPER};
use rayon::prelude::*;
use std::{sync::Arc, thread::sleep, time::Duration};
use threadpool::ThreadPool;

#[derive(Clone, Default)]
struct SlowDropper;

impl Drop for SlowDropper {
Expand Down Expand Up @@ -197,4 +223,25 @@ mod tests {
s.wait_for_backlog_drop(0);
assert!(now.elapsed() < Duration::from_millis(600));
}

#[test]
fn test_nested_drops() {
#[derive(Clone, Default)]
struct Nested {
_inner: DropHelper<SlowDropper>,
}

// pump 2 x max_tasks to the drop queue
let num_items = DEFAULT_DROPPER.max_tasks() * 2;
let items = vec![DropHelper::new(Nested::default()); num_items];
let drop_thread = std::thread::spawn(move || {
items.into_par_iter().for_each(drop);
});

// expect no deadlock and the whole thing to be dropped in full concurrency (with some leeway)
sleep(Duration::from_millis(
200 + 200 * num_items as u64 / DEFAULT_DROPPER.num_threads() as u64,
));
assert!(drop_thread.is_finished(), "Drop queue deadlocked.");
}
}
6 changes: 5 additions & 1 deletion crates/aptos-drop-helper/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,16 @@
use crate::async_concurrent_dropper::AsyncConcurrentDropper;
use derive_more::{Deref, DerefMut};
use once_cell::sync::Lazy;
use std::mem::ManuallyDrop;
use std::{cell::Cell, mem::ManuallyDrop};

pub mod async_concurrent_dropper;
pub mod async_drop_queue;
mod metrics;

thread_local! {
static IN_ANY_DROP_POOL: Cell<bool> = const { Cell::new(false) };
}

pub static DEFAULT_DROPPER: Lazy<AsyncConcurrentDropper> =
Lazy::new(|| AsyncConcurrentDropper::new("default", 32, 8));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ impl<'a> MoveTestAdapter<'a> for SimpleVMTestAdapter<'a> {
Compatibility::new(
!extra_args.skip_check_struct_layout,
!extra_args.skip_check_friend_linking,
false
false,
)
};
if vm.vm_config().use_loader_v2 {
Expand Down
Loading