Skip to content

Commit

Permalink
rt: initial implementation of new threaded runtime (#5823)
Browse files Browse the repository at this point in the history
This patch includes an initial implementation of a new multi-threaded
runtime. The new runtime aims to increase the scheduler throughput by
speeding up how it dispatches work to peer worker threads. This
implementation improves most benchmarks by about ~10% when the number of
threads is below 16. As threads increase, mutex contention deteriorates
performance.

Because the new scheduler is not yet ready to replace the old one, the
patch introduces it as an unstable runtime flavor with a warning that it
isn't production ready. Work to improve the scalability of the runtime
will most likely require more intrusive changes across Tokio, so I am
opting to merge with master to avoid larger conflicts.
  • Loading branch information
carllerche committed Jul 21, 2023
1 parent 63577cd commit 4165601
Show file tree
Hide file tree
Showing 45 changed files with 5,415 additions and 105 deletions.
30 changes: 25 additions & 5 deletions .github/labeler.yml
Original file line number Diff line number Diff line change
@@ -1,8 +1,28 @@

R-loom:
R-loom-sync:
- tokio/src/sync/*
- tokio/src/sync/**/*
- tokio-util/src/sync/*
- tokio-util/src/sync/**/*
- tokio/src/runtime/*
- tokio/src/runtime/**/*

R-loom-time-driver:
- tokio/src/runtime/time/*
- tokio/src/runtime/time/**/*

R-loom-current-thread:
- tokio/src/runtime/scheduler/*
- tokio/src/runtime/scheduler/current_thread/*
- tokio/src/runtime/task/*
- tokio/src/runtime/task/**

R-loom-multi-thread:
- tokio/src/runtime/scheduler/*
- tokio/src/runtime/scheduler/multi_thread/*
- tokio/src/runtime/scheduler/multi_thread/**
- tokio/src/runtime/task/*
- tokio/src/runtime/task/**

R-loom-multi-thread-alt:
- tokio/src/runtime/scheduler/*
- tokio/src/runtime/scheduler/multi_thread_alt/*
- tokio/src/runtime/scheduler/multi_thread_alt/**
- tokio/src/runtime/task/*
- tokio/src/runtime/task/**
106 changes: 86 additions & 20 deletions .github/workflows/loom.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ on:
name: Loom

env:
RUSTFLAGS: -Dwarnings
RUSTFLAGS: -Dwarnings --cfg loom --cfg tokio_unstable -C debug_assertions
LOOM_MAX_PREEMPTIONS: 2
LOOM_MAX_BRANCHES: 10000
RUST_BACKTRACE: 1
# Change to specific Rust release to pin
rust_stable: stable
Expand All @@ -17,26 +19,91 @@ permissions:
contents: read

jobs:
loom:
name: loom
loom-sync:
name: loom tokio::sync
# base_ref is null when it's not a pull request
if: github.repository_owner == 'tokio-rs' && (contains(github.event.pull_request.labels.*.name, 'R-loom') || (github.base_ref == null))
if: github.repository_owner == 'tokio-rs' && (contains(github.event.pull_request.labels.*.name, 'R-loom-sync') || (github.base_ref == null))
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Install Rust ${{ env.rust_stable }}
uses: dtolnay/rust-toolchain@master
with:
toolchain: ${{ env.rust_stable }}
- uses: Swatinem/rust-cache@v2
- name: run tests
run: cargo test --lib --release --features full -- --nocapture sync::tests
working-directory: tokio

loom-time-driver:
name: loom time driver
# base_ref is null when it's not a pull request
if: github.repository_owner == 'tokio-rs' && (contains(github.event.pull_request.labels.*.name, 'R-loom-time-driver') || (github.base_ref == null))
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Install Rust ${{ env.rust_stable }}
uses: dtolnay/rust-toolchain@master
with:
toolchain: ${{ env.rust_stable }}
- uses: Swatinem/rust-cache@v2
- name: run tests
run: cargo test --lib --release --features full -- --nocapture runtime::time::tests
working-directory: tokio

loom-current-thread:
name: loom current-thread scheduler
# base_ref is null when it's not a pull request
if: github.repository_owner == 'tokio-rs' && (contains(github.event.pull_request.labels.*.name, 'R-loom-current-thread') || (github.base_ref == null))
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Install Rust ${{ env.rust_stable }}
uses: dtolnay/rust-toolchain@master
with:
toolchain: ${{ env.rust_stable }}
- uses: Swatinem/rust-cache@v2
- name: run tests
run: cargo test --lib --release --features full -- --nocapture loom_current_thread
working-directory: tokio

loom-multi-thread:
name: loom multi-thread scheduler
# base_ref is null when it's not a pull request
if: github.repository_owner == 'tokio-rs' && (contains(github.event.pull_request.labels.*.name, 'R-loom-multi-thread') || (github.base_ref == null))
runs-on: ubuntu-latest
strategy:
matrix:
include:
- scope: loom_multi_thread::group_a
- scope: loom_multi_thread::group_b
- scope: loom_multi_thread::group_c
- scope: loom_multi_thread::group_d
steps:
- uses: actions/checkout@v3
- name: Install Rust ${{ env.rust_stable }}
uses: dtolnay/rust-toolchain@master
with:
toolchain: ${{ env.rust_stable }}
- uses: Swatinem/rust-cache@v2
- name: loom ${{ matrix.scope }}
run: cargo test --lib --release --features full -- $SCOPE
working-directory: tokio
env:
SCOPE: ${{ matrix.scope }}

loom-multi-thread-alt:
name: loom ALT multi-thread scheduler
# base_ref is null when it's not a pull request
if: github.repository_owner == 'tokio-rs' && (contains(github.event.pull_request.labels.*.name, 'R-loom-multi-thread-alt') || (github.base_ref == null))
runs-on: ubuntu-latest
strategy:
matrix:
include:
- scope: --skip loom_pool
max_preemptions: 2
- scope: loom_pool::group_a
max_preemptions: 2
- scope: loom_pool::group_b
max_preemptions: 2
- scope: loom_pool::group_c
max_preemptions: 2
- scope: loom_pool::group_d
max_preemptions: 2
- scope: time::driver
max_preemptions: 2
- scope: loom_multi_thread_alt::group_a
- scope: loom_multi_thread_alt::group_b
- scope: loom_multi_thread_alt::group_c
- scope: loom_multi_thread_alt::group_d
steps:
- uses: actions/checkout@v3
- name: Install Rust ${{ env.rust_stable }}
Expand All @@ -45,10 +112,9 @@ jobs:
toolchain: ${{ env.rust_stable }}
- uses: Swatinem/rust-cache@v2
- name: loom ${{ matrix.scope }}
run: cargo test --lib --release --features full -- --nocapture $SCOPE
run: cargo test --lib --release --features full -- $SCOPE
working-directory: tokio
env:
RUSTFLAGS: --cfg loom --cfg tokio_unstable -Dwarnings -C debug-assertions
LOOM_MAX_PREEMPTIONS: ${{ matrix.max_preemptions }}
LOOM_MAX_BRANCHES: 10000
SCOPE: ${{ matrix.scope }}
# TODO: remove this before stabilizing
LOOM_MAX_PREEMPTIONS: 1
2 changes: 1 addition & 1 deletion tokio/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ wasm-bindgen-test = "0.3.0"
mio-aio = { version = "0.7.0", features = ["tokio"] }

[target.'cfg(loom)'.dev-dependencies]
loom = { version = "0.5.2", features = ["futures", "checkpoint"] }
loom = { version = "0.6", features = ["futures", "checkpoint"] }

[package.metadata.docs.rs]
all-features = true
Expand Down
2 changes: 2 additions & 0 deletions tokio/src/loom/std/unsafe_cell.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@ impl<T> UnsafeCell<T> {
UnsafeCell(std::cell::UnsafeCell::new(data))
}

#[inline(always)]
pub(crate) fn with<R>(&self, f: impl FnOnce(*const T) -> R) -> R {
f(self.0.get())
}

#[inline(always)]
pub(crate) fn with_mut<R>(&self, f: impl FnOnce(*mut T) -> R) -> R {
f(self.0.get())
}
Expand Down
4 changes: 4 additions & 0 deletions tokio/src/runtime/blocking/schedule.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ impl BlockingSchedule {
}
#[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))]
scheduler::Handle::MultiThread(_) => {}
#[cfg(all(tokio_unstable, feature = "rt-multi-thread", not(tokio_wasi)))]
scheduler::Handle::MultiThreadAlt(_) => {}
}
}
BlockingSchedule {
Expand All @@ -45,6 +47,8 @@ impl task::Schedule for BlockingSchedule {
}
#[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))]
scheduler::Handle::MultiThread(_) => {}
#[cfg(all(tokio_unstable, feature = "rt-multi-thread", not(tokio_wasi)))]
scheduler::Handle::MultiThreadAlt(_) => {}
}
}
None
Expand Down
68 changes: 68 additions & 0 deletions tokio/src/runtime/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,8 @@ pub(crate) enum Kind {
CurrentThread,
#[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))]
MultiThread,
#[cfg(all(tokio_unstable, feature = "rt-multi-thread", not(tokio_wasi)))]
MultiThreadAlt,
}

impl Builder {
Expand Down Expand Up @@ -230,6 +232,26 @@ impl Builder {
// The number `61` is fairly arbitrary. I believe this value was copied from golang.
Builder::new(Kind::MultiThread, 61)
}

cfg_unstable! {
/// Returns a new builder with the alternate multi thread scheduler
/// selected.
///
/// The alternate multi threaded scheduler is an in-progress
/// candidate to replace the existing multi threaded scheduler. It
/// currently does not scale as well to 16+ processors.
///
/// This runtime flavor is currently **not considered production
/// ready**.
///
/// Configuration methods can be chained on the return value.
#[cfg(feature = "rt-multi-thread")]
#[cfg_attr(docsrs, doc(cfg(feature = "rt-multi-thread")))]
pub fn new_multi_thread_alt() -> Builder {
// The number `61` is fairly arbitrary. I believe this value was copied from golang.
Builder::new(Kind::MultiThreadAlt, 61)
}
}
}

/// Returns a new runtime builder initialized with default configuration
Expand Down Expand Up @@ -656,6 +678,8 @@ impl Builder {
Kind::CurrentThread => self.build_current_thread_runtime(),
#[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))]
Kind::MultiThread => self.build_threaded_runtime(),
#[cfg(all(tokio_unstable, feature = "rt-multi-thread", not(tokio_wasi)))]
Kind::MultiThreadAlt => self.build_alt_threaded_runtime(),
}
}

Expand All @@ -665,6 +689,8 @@ impl Builder {
Kind::CurrentThread => true,
#[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))]
Kind::MultiThread => false,
#[cfg(all(tokio_unstable, feature = "rt-multi-thread", not(tokio_wasi)))]
Kind::MultiThreadAlt => false,
},
enable_io: self.enable_io,
enable_time: self.enable_time,
Expand Down Expand Up @@ -1214,6 +1240,48 @@ cfg_rt_multi_thread! {

Ok(Runtime::from_parts(Scheduler::MultiThread(scheduler), handle, blocking_pool))
}

cfg_unstable! {
fn build_alt_threaded_runtime(&mut self) -> io::Result<Runtime> {
use crate::loom::sys::num_cpus;
use crate::runtime::{Config, runtime::Scheduler};
use crate::runtime::scheduler::MultiThreadAlt;

let core_threads = self.worker_threads.unwrap_or_else(num_cpus);

let (driver, driver_handle) = driver::Driver::new(self.get_cfg())?;

// Create the blocking pool
let blocking_pool =
blocking::create_blocking_pool(self, self.max_blocking_threads + core_threads);
let blocking_spawner = blocking_pool.spawner().clone();

// Generate a rng seed for this runtime.
let seed_generator_1 = self.seed_generator.next_generator();
let seed_generator_2 = self.seed_generator.next_generator();

let (scheduler, handle) = MultiThreadAlt::new(
core_threads,
driver,
driver_handle,
blocking_spawner,
seed_generator_2,
Config {
before_park: self.before_park.clone(),
after_unpark: self.after_unpark.clone(),
global_queue_interval: self.global_queue_interval,
event_interval: self.event_interval,
#[cfg(tokio_unstable)]
unhandled_panic: self.unhandled_panic.clone(),
disable_lifo_slot: self.disable_lifo_slot,
seed_generator: seed_generator_1,
metrics_poll_count_histogram: self.metrics_poll_count_histogram_builder(),
},
);

Ok(Runtime::from_parts(Scheduler::MultiThreadAlt(scheduler), handle, blocking_pool))
}
}
}
}

Expand Down
6 changes: 6 additions & 0 deletions tokio/src/runtime/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,8 @@ impl Handle {
scheduler::Handle::CurrentThread(_) => RuntimeFlavor::CurrentThread,
#[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))]
scheduler::Handle::MultiThread(_) => RuntimeFlavor::MultiThread,
#[cfg(all(tokio_unstable, feature = "rt-multi-thread", not(tokio_wasi)))]
scheduler::Handle::MultiThreadAlt(_) => RuntimeFlavor::MultiThreadAlt,
}
}

Expand Down Expand Up @@ -385,6 +387,8 @@ impl Handle {
scheduler::Handle::CurrentThread(handle) => handle.owned_id(),
#[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))]
scheduler::Handle::MultiThread(handle) => handle.owned_id(),
#[cfg(all(tokio_unstable, feature = "rt-multi-thread", not(tokio_wasi)))]
scheduler::Handle::MultiThreadAlt(handle) => handle.owned_id(),
};
owned_id.into()
}
Expand Down Expand Up @@ -535,6 +539,8 @@ cfg_taskdump! {
handle.dump().await
}).await
},
#[cfg(all(tokio_unstable, feature = "rt-multi-thread", not(tokio_wasi)))]
scheduler::Handle::MultiThreadAlt(_) => panic!("task dump not implemented for this runtime flavor"),
}
}
}
Expand Down
19 changes: 19 additions & 0 deletions tokio/src/runtime/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@ use std::time::Duration;
cfg_rt_multi_thread! {
use crate::runtime::Builder;
use crate::runtime::scheduler::MultiThread;

cfg_unstable! {
use crate::runtime::scheduler::MultiThreadAlt;
}
}

/// The Tokio runtime.
Expand Down Expand Up @@ -109,6 +113,9 @@ pub enum RuntimeFlavor {
CurrentThread,
/// The flavor that executes tasks across multiple threads.
MultiThread,
/// The flavor that executes tasks across multiple threads.
#[cfg(tokio_unstable)]
MultiThreadAlt,
}

/// The runtime scheduler is either a multi-thread or a current-thread executor.
Expand All @@ -120,6 +127,10 @@ pub(super) enum Scheduler {
/// Execute tasks across multiple threads.
#[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))]
MultiThread(MultiThread),

/// Execute tasks across multiple threads.
#[cfg(all(tokio_unstable, feature = "rt-multi-thread", not(tokio_wasi)))]
MultiThreadAlt(MultiThreadAlt),
}

impl Runtime {
Expand Down Expand Up @@ -336,6 +347,8 @@ impl Runtime {
Scheduler::CurrentThread(exec) => exec.block_on(&self.handle.inner, future),
#[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))]
Scheduler::MultiThread(exec) => exec.block_on(&self.handle.inner, future),
#[cfg(all(tokio_unstable, feature = "rt-multi-thread", not(tokio_wasi)))]
Scheduler::MultiThreadAlt(exec) => exec.block_on(&self.handle.inner, future),
}
}

Expand Down Expand Up @@ -456,6 +469,12 @@ impl Drop for Runtime {
// already in the runtime's context.
multi_thread.shutdown(&self.handle.inner);
}
#[cfg(all(tokio_unstable, feature = "rt-multi-thread", not(tokio_wasi)))]
Scheduler::MultiThreadAlt(multi_thread) => {
// The threaded scheduler drops its tasks on its worker threads, which is
// already in the runtime's context.
multi_thread.shutdown(&self.handle.inner);
}
}
}
}
Expand Down
Loading

0 comments on commit 4165601

Please sign in to comment.