Skip to content

Commit

Permalink
rt(alt): fix memory leak and increase max preemption when running Loo…
Browse files Browse the repository at this point in the history
…m CI tests (#5911)

The memory leak was caused by a bug during shutdown where some state was leaked.
  • Loading branch information
carllerche committed Aug 10, 2023
1 parent 5d29bdf commit dd23f08
Show file tree
Hide file tree
Showing 4 changed files with 124 additions and 108 deletions.
2 changes: 0 additions & 2 deletions .github/workflows/loom.yml
Original file line number Diff line number Diff line change
Expand Up @@ -120,5 +120,3 @@ jobs:
working-directory: tokio
env:
SCOPE: ${{ matrix.scope }}
# TODO: remove this before stabilizing
LOOM_MAX_PREEMPTIONS: 1
2 changes: 1 addition & 1 deletion tokio/src/runtime/scheduler/multi_thread_alt/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ impl Handle {
}

pub(crate) fn shutdown(&self) {
self.shared.close();
self.shared.close(self);
self.driver.unpark();
}

Expand Down
34 changes: 16 additions & 18 deletions tokio/src/runtime/scheduler/multi_thread_alt/idle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

use crate::loom::sync::atomic::{AtomicBool, AtomicUsize};
use crate::loom::sync::MutexGuard;
use crate::runtime::scheduler::multi_thread_alt::{worker, Core, Shared};
use crate::runtime::scheduler::multi_thread_alt::{worker, Core, Handle, Shared};

use std::sync::atomic::Ordering::{AcqRel, Acquire, Release};

Expand Down Expand Up @@ -146,23 +146,13 @@ impl Idle {
// Find a sleeping worker
if let Some(worker) = synced.idle.sleepers.pop() {
// Find an available core
if let Some(mut core) = synced.idle.available_cores.pop() {
if let Some(mut core) = self.try_acquire_available_core(&mut synced.idle) {
debug_assert!(!core.is_searching);
core.is_searching = true;

self.idle_map.unset(core.index);
debug_assert!(self.idle_map.matches(&synced.idle.available_cores));

// Assign the core to the worker
synced.assigned_cores[worker] = Some(core);

let num_idle = synced.idle.available_cores.len();
#[cfg(not(loom))]
debug_assert_eq!(num_idle, self.num_idle.load(Acquire) - 1);

// Update the number of sleeping workers
self.num_idle.store(num_idle, Release);

// Drop the lock before notifying the condvar.
drop(synced);

Expand Down Expand Up @@ -198,6 +188,7 @@ impl Idle {

for _ in 0..num {
if let Some(worker) = synced.idle.sleepers.pop() {
// TODO: can this be switched to use next_available_core?
if let Some(core) = synced.idle.available_cores.pop() {
debug_assert!(!core.is_searching);

Expand Down Expand Up @@ -236,15 +227,10 @@ impl Idle {
// eventually find the cores and shut them down.
while !synced.idle.sleepers.is_empty() && !synced.idle.available_cores.is_empty() {
let worker = synced.idle.sleepers.pop().unwrap();
let core = synced.idle.available_cores.pop().unwrap();

self.idle_map.unset(core.index);
let core = self.try_acquire_available_core(&mut synced.idle).unwrap();

synced.assigned_cores[worker] = Some(core);
shared.condvars[worker].notify_one();

self.num_idle
.store(synced.idle.available_cores.len(), Release);
}

debug_assert!(self.idle_map.matches(&synced.idle.available_cores));
Expand All @@ -255,6 +241,18 @@ impl Idle {
}
}

pub(super) fn shutdown_unassigned_cores(&self, handle: &Handle, shared: &Shared) {
// If there are any remaining cores, shut them down here.
//
// This code is a bit convoluted to avoid lock-reentry.
while let Some(core) = {
let mut synced = shared.synced.lock();
self.try_acquire_available_core(&mut synced.idle)
} {
shared.shutdown_core(handle, core);
}
}

/// The worker releases the given core, making it available to other workers
/// that are waiting.
pub(super) fn release_core(&self, synced: &mut worker::Synced, core: Box<Core>) {
Expand Down
194 changes: 107 additions & 87 deletions tokio/src/runtime/scheduler/multi_thread_alt/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -560,7 +560,7 @@ impl Worker {
// First try to acquire an available core
if let Some(core) = self.try_acquire_available_core(cx, &mut synced) {
// Try to poll a task from the global queue
let maybe_task = self.next_remote_task_synced(cx, &mut synced);
let maybe_task = cx.shared().next_remote_task_synced(&mut synced);
(maybe_task, core)
} else {
// block the thread to wait for a core to be assinged to us
Expand Down Expand Up @@ -589,10 +589,7 @@ impl Worker {
}
}

self.pre_shutdown(cx, &mut core);

// Signal shutdown
self.shutdown_core(cx, core);
cx.shared().shutdown_core(&cx.handle, core);

// It is possible that tasks wake others during drop, so we need to
// clear the defer list.
Expand Down Expand Up @@ -746,7 +743,7 @@ impl Worker {
}
}

if let Some(task) = self.next_local_task(&mut core) {
if let Some(task) = core.next_local_task() {
return Ok((Some(task), core));
}

Expand All @@ -759,12 +756,7 @@ impl Worker {
}

let mut synced = cx.shared().synced.lock();
self.next_remote_task_synced(cx, &mut synced)
}

fn next_remote_task_synced(&self, cx: &Context, synced: &mut Synced) -> Option<Notified> {
// safety: we only have access to a valid `Synced` in this file.
unsafe { cx.shared().inject.pop(&mut synced.inject) }
cx.shared().next_remote_task_synced(&mut synced)
}

fn next_remote_task_batch(&self, cx: &Context, mut core: Box<Core>) -> NextTaskResult {
Expand Down Expand Up @@ -818,14 +810,6 @@ impl Worker {
ret
}

fn next_local_task(&self, core: &mut Core) -> Option<Notified> {
self.next_lifo_task(core).or_else(|| core.run_queue.pop())
}

fn next_lifo_task(&self, core: &mut Core) -> Option<Notified> {
core.lifo_slot.take()
}

/// Function responsible for stealing tasks from another worker
///
/// Note: Only if less than half the workers are searching for tasks to steal
Expand Down Expand Up @@ -948,7 +932,7 @@ impl Worker {
};

// Check for a task in the LIFO slot
let task = match self.next_lifo_task(&mut core) {
let task = match core.next_lifo_task() {
Some(task) => task,
None => {
self.reset_lifo_enabled(cx);
Expand Down Expand Up @@ -1229,7 +1213,7 @@ impl Worker {
if cx.shared().inject.is_closed(&mut synced.inject) {
synced.shutdown_driver = Some(driver);
self.shutdown_clear_defer(cx);
self.shutdown_finalize(cx, synced);
cx.shared().shutdown_finalize(&cx.handle, &mut synced);
return Err(());
}

Expand Down Expand Up @@ -1281,61 +1265,6 @@ impl Worker {
core.lifo_slot.is_some() || !core.run_queue.is_empty()
}

/// Signals all tasks to shut down, and waits for them to complete. Must run
/// before we enter the single-threaded phase of shutdown processing.
fn pre_shutdown(&self, cx: &Context, core: &mut Core) {
// Signal to all tasks to shut down.
cx.shared().owned.close_and_shutdown_all();

core.stats.submit(&cx.shared().worker_metrics[core.index]);
}

/// Signals that a worker has observed the shutdown signal and has replaced
/// its core back into its handle.
///
/// If all workers have reached this point, the final cleanup is performed.
fn shutdown_core(&self, cx: &Context, core: Box<Core>) {
let mut synced = cx.shared().synced.lock();
synced.shutdown_cores.push(core);

self.shutdown_finalize(cx, synced);
}

fn shutdown_finalize(&self, cx: &Context, mut synced: MutexGuard<'_, Synced>) {
// Wait for all cores
if synced.shutdown_cores.len() != cx.shared().remotes.len() {
return;
}

let driver = synced.shutdown_driver.take();

if cx.shared().driver_enabled() && driver.is_none() {
return;
}

debug_assert!(cx.shared().owned.is_empty());

for mut core in synced.shutdown_cores.drain(..) {
// Drain tasks from the local queue
while self.next_local_task(&mut core).is_some() {}
}

// Shutdown the driver
if let Some(mut driver) = driver {
driver.shutdown(&cx.handle.driver);
}

// Drain the injection queue
//
// We already shut down every task, so we can simply drop the tasks. We
// cannot call `next_remote_task()` because we already hold the lock.
//
// safety: passing in correct `idle::Synced`
while let Some(task) = self.next_remote_task_synced(cx, &mut synced) {
drop(task);
}
}

fn reset_lifo_enabled(&self, cx: &Context) {
cx.lifo_enabled
.set(!cx.handle.shared.config.disable_lifo_slot);
Expand Down Expand Up @@ -1379,7 +1308,22 @@ impl Context {
}
}

impl Core {
fn next_local_task(&mut self) -> Option<Notified> {
self.next_lifo_task().or_else(|| self.run_queue.pop())
}

fn next_lifo_task(&mut self) -> Option<Notified> {
self.lifo_slot.take()
}
}

impl Shared {
fn next_remote_task_synced(&self, synced: &mut Synced) -> Option<Notified> {
// safety: we only have access to a valid `Synced` in this file.
unsafe { self.inject.pop(&mut synced.inject) }
}

pub(super) fn schedule_task(&self, task: Notified, is_yield: bool) {
use std::ptr;

Expand Down Expand Up @@ -1449,17 +1393,25 @@ impl Shared {
self.idle.notify_remote(synced, self);
}

pub(super) fn close(&self) {
let mut synced = self.synced.lock();
pub(super) fn close(&self, handle: &Handle) {
{
let mut synced = self.synced.lock();

if let Some(driver) = self.driver.take() {
synced.shutdown_driver = Some(driver);
}
if let Some(driver) = self.driver.take() {
synced.shutdown_driver = Some(driver);
}

if !self.inject.close(&mut synced.inject) {
return;
}

if self.inject.close(&mut synced.inject) {
// Set the shutdown flag on all available cores
self.idle.shutdown(&mut synced, self);
}

// Any unassigned cores need to be shutdown, but we have to first drop
// the lock
self.idle.shutdown_unassigned_cores(handle, self);
}

fn push_remote_task(&self, synced: &mut Synced, task: Notified) {
Expand Down Expand Up @@ -1498,6 +1450,52 @@ impl Shared {
fn driver_enabled(&self) -> bool {
self.condvars.len() > self.remotes.len()
}

pub(super) fn shutdown_core(&self, handle: &Handle, mut core: Box<Core>) {
self.owned.close_and_shutdown_all();

core.stats.submit(&self.worker_metrics[core.index]);

let mut synced = self.synced.lock();
synced.shutdown_cores.push(core);

self.shutdown_finalize(handle, &mut synced);
}

pub(super) fn shutdown_finalize(&self, handle: &Handle, synced: &mut Synced) {
// Wait for all cores
if synced.shutdown_cores.len() != self.remotes.len() {
return;
}

let driver = synced.shutdown_driver.take();

if self.driver_enabled() && driver.is_none() {
return;
}

debug_assert!(self.owned.is_empty());

for mut core in synced.shutdown_cores.drain(..) {
// Drain tasks from the local queue
while core.next_local_task().is_some() {}
}

// Shutdown the driver
if let Some(mut driver) = driver {
driver.shutdown(&handle.driver);
}

// Drain the injection queue
//
// We already shut down every task, so we can simply drop the tasks. We
// cannot call `next_remote_task()` because we already hold the lock.
//
// safety: passing in correct `idle::Synced`
while let Some(task) = self.next_remote_task_synced(synced) {
drop(task);
}
}
}

impl Overflow<Arc<Handle>> for Shared {
Expand All @@ -1514,10 +1512,20 @@ impl Overflow<Arc<Handle>> for Shared {
}

impl<'a> Lock<inject::Synced> for &'a Shared {
type Handle = InjectGuard<'a>;
type Handle = SyncedGuard<'a>;

fn lock(self) -> Self::Handle {
InjectGuard {
SyncedGuard {
lock: self.synced.lock(),
}
}
}

impl<'a> Lock<Synced> for &'a Shared {
type Handle = SyncedGuard<'a>;

fn lock(self) -> Self::Handle {
SyncedGuard {
lock: self.synced.lock(),
}
}
Expand All @@ -1537,16 +1545,28 @@ impl task::Schedule for Arc<Handle> {
}
}

pub(crate) struct InjectGuard<'a> {
impl AsMut<Synced> for Synced {
fn as_mut(&mut self) -> &mut Synced {
self
}
}

pub(crate) struct SyncedGuard<'a> {
lock: crate::loom::sync::MutexGuard<'a, Synced>,
}

impl<'a> AsMut<inject::Synced> for InjectGuard<'a> {
impl<'a> AsMut<inject::Synced> for SyncedGuard<'a> {
fn as_mut(&mut self) -> &mut inject::Synced {
&mut self.lock.inject
}
}

impl<'a> AsMut<Synced> for SyncedGuard<'a> {
fn as_mut(&mut self) -> &mut Synced {
&mut self.lock
}
}

#[track_caller]
fn with_current<R>(f: impl FnOnce(Option<&Context>) -> R) -> R {
use scheduler::Context::MultiThreadAlt;
Expand Down

0 comments on commit dd23f08

Please sign in to comment.