Skip to content

Commit

Permalink
rt: move driver unpark out of multi-thread parker (#5026)
Browse files Browse the repository at this point in the history
This patch removes the driver Unpark handle out of the multi-thread
parker and passes a reference in when it is needed. This is a first step
towards getting rid of the separate driver unpark handle in favor of
just using the regular driver handle. Because the regular driver handle
is owned at a higher level (at the top of the worker struct).
  • Loading branch information
carllerche authored Sep 17, 2022
1 parent cdd6eea commit cba5c10
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 17 deletions.
2 changes: 2 additions & 0 deletions tokio/src/runtime/scheduler/multi_thread/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,13 @@ impl MultiThread {
seed_generator: RngSeedGenerator,
config: Config,
) -> (MultiThread, Launch) {
let driver_unpark = driver.unpark();
let parker = Parker::new(driver);
let (handle, launch) = worker::create(
size,
parker,
driver_handle,
driver_unpark,
blocking_spawner,
seed_generator,
config,
Expand Down
20 changes: 5 additions & 15 deletions tokio/src/runtime/scheduler/multi_thread/park.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
use crate::loom::sync::atomic::AtomicUsize;
use crate::loom::sync::{Arc, Condvar, Mutex};
use crate::loom::thread;
use crate::runtime::driver::{Driver, Unpark};
use crate::runtime::driver::{self, Driver};
use crate::util::TryLock;

use std::sync::atomic::Ordering::SeqCst;
Expand Down Expand Up @@ -42,23 +42,17 @@ const NOTIFIED: usize = 3;
struct Shared {
/// Shared driver. Only one thread at a time can use this
driver: TryLock<Driver>,

/// Unpark handle
handle: Unpark,
}

impl Parker {
pub(crate) fn new(driver: Driver) -> Parker {
let handle = driver.unpark();

Parker {
inner: Arc::new(Inner {
state: AtomicUsize::new(EMPTY),
mutex: Mutex::new(()),
condvar: Condvar::new(),
shared: Arc::new(Shared {
driver: TryLock::new(driver),
handle,
}),
}),
}
Expand Down Expand Up @@ -102,8 +96,8 @@ impl Clone for Parker {
}

impl Unparker {
pub(crate) fn unpark(&self) {
self.inner.unpark();
pub(crate) fn unpark(&self, driver: &driver::Unpark) {
self.inner.unpark(driver);
}
}

Expand Down Expand Up @@ -201,7 +195,7 @@ impl Inner {
}
}

fn unpark(&self) {
fn unpark(&self, driver: &driver::Unpark) {
// To ensure the unparked thread will observe any writes we made before
// this call, we must perform a release operation that `park` can
// synchronize with. To do that we must write `NOTIFIED` even if `state`
Expand All @@ -211,7 +205,7 @@ impl Inner {
EMPTY => {} // no one was waiting
NOTIFIED => {} // already unparked
PARKED_CONDVAR => self.unpark_condvar(),
PARKED_DRIVER => self.unpark_driver(),
PARKED_DRIVER => driver.unpark(),
actual => panic!("inconsistent state in unpark; actual = {}", actual),
}
}
Expand All @@ -233,10 +227,6 @@ impl Inner {
self.condvar.notify_one()
}

fn unpark_driver(&self) {
self.shared.handle.unpark();
}

fn shutdown(&self) {
if let Some(mut driver) = self.shared.driver.try_lock() {
driver.shutdown();
Expand Down
9 changes: 7 additions & 2 deletions tokio/src/runtime/scheduler/multi_thread/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,9 @@ pub(super) struct Shared {
/// how they communicate between each other.
remotes: Box<[Remote]>,

/// Used to unpark threads blocked on the I/O driver
driver: driver::Unpark,

/// Global task queue used for:
/// 1. Submit work to the scheduler while **not** currently on a worker thread.
/// 2. Submit work to the scheduler when a worker run queue is saturated
Expand Down Expand Up @@ -190,6 +193,7 @@ pub(super) fn create(
size: usize,
park: Parker,
driver_handle: driver::Handle,
driver_unpark: driver::Unpark,
blocking_spawner: blocking::Spawner,
seed_generator: RngSeedGenerator,
config: Config,
Expand Down Expand Up @@ -223,6 +227,7 @@ pub(super) fn create(
let handle = Arc::new(Handle {
shared: Shared {
remotes: remotes.into_boxed_slice(),
driver: driver_unpark,
inject: Inject::new(),
idle: Idle::new(size),
owned: OwnedTasks::new(),
Expand Down Expand Up @@ -774,13 +779,13 @@ impl Shared {

fn notify_parked(&self) {
if let Some(index) = self.idle.worker_to_notify() {
self.remotes[index].unpark.unpark();
self.remotes[index].unpark.unpark(&self.driver);
}
}

fn notify_all(&self) {
for remote in &self.remotes[..] {
remote.unpark.unpark();
remote.unpark.unpark(&self.driver);
}
}

Expand Down

0 comments on commit cba5c10

Please sign in to comment.