From 6fed1c3e7dfdda27b8809d6a1fd1e83d260cbf39 Mon Sep 17 00:00:00 2001 From: fakeshadow <24548779@qq.com> Date: Mon, 11 Oct 2021 09:58:11 +0800 Subject: [PATCH] add support for io-uring (#374) Co-authored-by: Rob Ede --- .cargo/config.toml | 20 ++++++++++- .github/workflows/ci.yml | 58 +++++++++++++++++------------- actix-rt/CHANGES.md | 2 ++ actix-rt/Cargo.toml | 4 +++ actix-rt/src/arbiter.rs | 72 ++++++++++++++++++++++++++++++++------ actix-rt/src/lib.rs | 7 ++++ actix-rt/src/runtime.rs | 5 --- actix-rt/src/system.rs | 2 +- actix-rt/tests/tests.rs | 46 ++++++++++++++++++++---- actix-server/CHANGES.md | 2 ++ actix-server/Cargo.toml | 1 + actix-server/src/worker.rs | 16 +++++++-- actix-tls/src/lib.rs | 1 + 13 files changed, 185 insertions(+), 51 deletions(-) diff --git a/.cargo/config.toml b/.cargo/config.toml index 16d75ced0f..0e5de4869f 100644 --- a/.cargo/config.toml +++ b/.cargo/config.toml @@ -1,5 +1,23 @@ [alias] chk = "check --workspace --all-features --tests --examples --bins" lint = "clippy --workspace --all-features --tests --examples --bins -- -Dclippy::todo" -ci-test = "test --workspace --all-features --lib --tests --no-fail-fast -- --nocapture" + ci-doctest = "test --workspace --all-features --doc --no-fail-fast -- --nocapture" + +# just check the library (without dev deps) +ci-check-min = "hack --workspace check --no-default-features" +ci-check-lib = "hack --workspace --feature-powerset --exclude-features io-uring check" +ci-check-lib-linux = "hack --workspace --feature-powerset check" + +# check everything +ci-check = "hack --workspace --feature-powerset --exclude-features io-uring check --tests --examples" +ci-check-linux = "hack --workspace --feature-powerset check --tests --examples" + +# tests avoiding io-uring feature +ci-test = "hack test --workspace --exclude=actix-rt --exclude=actix-server --all-features --lib --tests --no-fail-fast -- --nocapture" +ci-test-rt = " hack --feature-powerset --exclude-features io-uring test --package=actix-rt --lib --tests --no-fail-fast -- --nocapture" +ci-test-server = "hack --feature-powerset --exclude-features io-uring test --package=actix-server --lib --tests --no-fail-fast -- --nocapture" + +# test with io-uring feature +ci-test-rt-linux = " hack --feature-powerset test --package=actix-rt --lib --tests --no-fail-fast -- --nocapture" +ci-test-server-linux = "hack --feature-powerset test --package=actix-server --lib --tests --no-fail-fast -- --nocapture" diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index a0f6291027..45841fb8a2 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -75,36 +75,47 @@ jobs: command: install args: cargo-hack - - name: check minimal + - name: check lib + if: > + matrix.target.os != 'ubuntu-latest' + && matrix.target.triple != 'x86_64-pc-windows-gnu' uses: actions-rs/cargo@v1 - with: - command: hack - args: check --workspace --no-default-features - - - name: check minimal + tests + with: { command: ci-check-lib } + - name: check lib + if: matrix.target.os == 'ubuntu-latest' uses: actions-rs/cargo@v1 - with: - command: hack - args: check --workspace --no-default-features --tests --examples - - - name: check default + with: { command: ci-check-lib-linux } + - name: check lib + if: matrix.target.triple == 'x86_64-pc-windows-gnu' uses: actions-rs/cargo@v1 - with: - command: check - args: --workspace --tests --examples - + with: { command: ci-check-min } + - name: check full # TODO: compile OpenSSL and run tests on MinGW - if: matrix.target.triple != 'x86_64-pc-windows-gnu' + if: > + matrix.target.os != 'ubuntu-latest' + && matrix.target.triple != 'x86_64-pc-windows-gnu' uses: actions-rs/cargo@v1 - with: - command: check - args: --workspace --all-features --tests --examples + with: { command: ci-check } + - name: check all + if: matrix.target.os == 'ubuntu-latest' + uses: actions-rs/cargo@v1 + with: { command: ci-check-linux } - name: tests - if: matrix.target.triple != 'x86_64-pc-windows-gnu' - uses: actions-rs/cargo@v1 - with: { command: ci-test } + if: > + matrix.target.os != 'ubuntu-latest' + && matrix.target.triple != 'x86_64-pc-windows-gnu' + run: | + cargo ci-test + cargo ci-test-rt + cargo ci-test-server + - name: tests + if: matrix.target.os == 'ubuntu-latest' + run: | + cargo ci-test + cargo ci-test-rt-linux + cargo ci-test-server-linux - name: Generate coverage file if: > @@ -120,8 +131,7 @@ jobs: && matrix.version == 'stable' && github.ref == 'refs/heads/master' uses: codecov/codecov-action@v1 - with: - file: cobertura.xml + with: { file: cobertura.xml } - name: Clear the cargo caches run: | diff --git a/actix-rt/CHANGES.md b/actix-rt/CHANGES.md index 42879e12b1..373640d321 100644 --- a/actix-rt/CHANGES.md +++ b/actix-rt/CHANGES.md @@ -1,9 +1,11 @@ # Changes ## Unreleased - 2021-xx-xx +* Add `io-uring` feature for enabling async file I/O on linux. [#374] * The `spawn` method can now resolve with non-unit outputs. [#369] [#369]: https://github.com/actix/actix-net/pull/369 +[#374]: https://github.com/actix/actix-net/pull/374 ## 2.2.0 - 2021-03-29 diff --git a/actix-rt/Cargo.toml b/actix-rt/Cargo.toml index f4a90d2ce3..b466bb76c0 100644 --- a/actix-rt/Cargo.toml +++ b/actix-rt/Cargo.toml @@ -21,6 +21,7 @@ path = "src/lib.rs" [features] default = ["macros"] macros = ["actix-macros"] +io-uring = ["tokio-uring"] [dependencies] actix-macros = { version = "0.2.0", optional = true } @@ -28,6 +29,9 @@ actix-macros = { version = "0.2.0", optional = true } futures-core = { version = "0.3", default-features = false } tokio = { version = "1.3", features = ["rt", "net", "parking_lot", "signal", "sync", "time"] } +[target.'cfg(target_os = "linux")'.dependencies] +tokio-uring = { version = "0.1", optional = true } + [dev-dependencies] tokio = { version = "1.2", features = ["full"] } hyper = { version = "0.14", default-features = false, features = ["server", "tcp", "http1"] } diff --git a/actix-rt/src/arbiter.rs b/actix-rt/src/arbiter.rs index 9ff1419db6..97084f0558 100644 --- a/actix-rt/src/arbiter.rs +++ b/actix-rt/src/arbiter.rs @@ -9,12 +9,9 @@ use std::{ }; use futures_core::ready; -use tokio::{sync::mpsc, task::LocalSet}; +use tokio::sync::mpsc; -use crate::{ - runtime::{default_tokio_runtime, Runtime}, - system::{System, SystemCommand}, -}; +use crate::system::{System, SystemCommand}; pub(crate) static COUNT: AtomicUsize = AtomicUsize::new(0); @@ -98,16 +95,19 @@ impl Arbiter { /// /// # Panics /// Panics if a [System] is not registered on the current thread. + #[cfg(not(all(target_os = "linux", feature = "io-uring")))] #[allow(clippy::new_without_default)] pub fn new() -> Arbiter { Self::with_tokio_rt(|| { - default_tokio_runtime().expect("Cannot create new Arbiter's Runtime.") + crate::runtime::default_tokio_runtime() + .expect("Cannot create new Arbiter's Runtime.") }) } /// Spawn a new Arbiter using the [Tokio Runtime](tokio-runtime) returned from a closure. /// /// [tokio-runtime]: tokio::runtime::Runtime + #[cfg(not(all(target_os = "linux", feature = "io-uring")))] #[doc(hidden)] pub fn with_tokio_rt(runtime_factory: F) -> Arbiter where @@ -127,7 +127,7 @@ impl Arbiter { .spawn({ let tx = tx.clone(); move || { - let rt = Runtime::from(runtime_factory()); + let rt = crate::runtime::Runtime::from(runtime_factory()); let hnd = ArbiterHandle::new(tx); System::set_current(sys); @@ -159,15 +159,67 @@ impl Arbiter { Arbiter { tx, thread_handle } } - /// Sets up an Arbiter runner in a new System using the provided runtime local task set. - pub(crate) fn in_new_system(local: &LocalSet) -> ArbiterHandle { + /// Spawn a new Arbiter thread and start its event loop with `tokio-uring` runtime. + /// + /// # Panics + /// Panics if a [System] is not registered on the current thread. + #[cfg(all(target_os = "linux", feature = "io-uring"))] + #[allow(clippy::new_without_default)] + pub fn new() -> Arbiter { + let sys = System::current(); + let system_id = sys.id(); + let arb_id = COUNT.fetch_add(1, Ordering::Relaxed); + + let name = format!("actix-rt|system:{}|arbiter:{}", system_id, arb_id); + let (tx, rx) = mpsc::unbounded_channel(); + + let (ready_tx, ready_rx) = std::sync::mpsc::channel::<()>(); + + let thread_handle = thread::Builder::new() + .name(name.clone()) + .spawn({ + let tx = tx.clone(); + move || { + let hnd = ArbiterHandle::new(tx); + + System::set_current(sys); + + HANDLE.with(|cell| *cell.borrow_mut() = Some(hnd.clone())); + + // register arbiter + let _ = System::current() + .tx() + .send(SystemCommand::RegisterArbiter(arb_id, hnd)); + + ready_tx.send(()).unwrap(); + + // run arbiter event processing loop + tokio_uring::start(ArbiterRunner { rx }); + + // deregister arbiter + let _ = System::current() + .tx() + .send(SystemCommand::DeregisterArbiter(arb_id)); + } + }) + .unwrap_or_else(|err| { + panic!("Cannot spawn Arbiter's thread: {:?}. {:?}", &name, err) + }); + + ready_rx.recv().unwrap(); + + Arbiter { tx, thread_handle } + } + + /// Sets up an Arbiter runner in a new System using the environment's local set. + pub(crate) fn in_new_system() -> ArbiterHandle { let (tx, rx) = mpsc::unbounded_channel(); let hnd = ArbiterHandle::new(tx); HANDLE.with(|cell| *cell.borrow_mut() = Some(hnd.clone())); - local.spawn_local(ArbiterRunner { rx }); + crate::spawn(ArbiterRunner { rx }); hnd } diff --git a/actix-rt/src/lib.rs b/actix-rt/src/lib.rs index 95afcac98d..e078dd0624 100644 --- a/actix-rt/src/lib.rs +++ b/actix-rt/src/lib.rs @@ -32,6 +32,10 @@ //! arbiter.stop(); //! arbiter.join().unwrap(); //! ``` +//! +//! # `io-uring` Support +//! There is experimental support for using io-uring with this crate by enabling the +//! `io-uring` feature. For now, it is semver exempt. #![deny(rust_2018_idioms, nonstandard_style)] #![allow(clippy::type_complexity)] @@ -39,6 +43,9 @@ #![doc(html_logo_url = "https://actix.rs/img/logo.png")] #![doc(html_favicon_url = "https://actix.rs/favicon.ico")] +#[cfg(all(not(target_os = "linux"), feature = "io-uring"))] +compile_error!("io_uring is a linux only feature."); + use std::future::Future; use tokio::task::JoinHandle; diff --git a/actix-rt/src/runtime.rs b/actix-rt/src/runtime.rs index 1adbf6c0fc..25937003fe 100644 --- a/actix-rt/src/runtime.rs +++ b/actix-rt/src/runtime.rs @@ -31,11 +31,6 @@ impl Runtime { }) } - /// Reference to local task set. - pub(crate) fn local_set(&self) -> &LocalSet { - &self.local - } - /// Offload a future onto the single-threaded runtime. /// /// The returned join handle can be used to await the future's result. diff --git a/actix-rt/src/system.rs b/actix-rt/src/system.rs index 3bc8a6e30e..4f262ede83 100644 --- a/actix-rt/src/system.rs +++ b/actix-rt/src/system.rs @@ -54,7 +54,7 @@ impl System { let (sys_tx, sys_rx) = mpsc::unbounded_channel(); let rt = Runtime::from(runtime_factory()); - let sys_arbiter = Arbiter::in_new_system(rt.local_set()); + let sys_arbiter = rt.block_on(async { Arbiter::in_new_system() }); let system = System::construct(sys_tx, sys_arbiter.clone()); system diff --git a/actix-rt/tests/tests.rs b/actix-rt/tests/tests.rs index e66696bf69..5fe1e89400 100644 --- a/actix-rt/tests/tests.rs +++ b/actix-rt/tests/tests.rs @@ -1,10 +1,6 @@ use std::{ future::Future, - sync::{ - atomic::{AtomicBool, Ordering}, - mpsc::channel, - Arc, - }, + sync::mpsc::channel, thread, time::{Duration, Instant}, }; @@ -221,8 +217,8 @@ fn system_stop_stops_arbiters() { System::current().stop(); sys.run().unwrap(); - // account for slightly slow thread de-spawns (only observed on windows) - thread::sleep(Duration::from_millis(100)); + // account for slightly slow thread de-spawns + thread::sleep(Duration::from_millis(500)); // arbiter should be dead and return false assert!(!Arbiter::current().spawn_fn(|| {})); @@ -231,6 +227,7 @@ fn system_stop_stops_arbiters() { arb.join().unwrap(); } +#[cfg(not(feature = "io-uring"))] #[test] fn new_system_with_tokio() { let (tx, rx) = channel(); @@ -263,8 +260,14 @@ fn new_system_with_tokio() { assert_eq!(rx.recv().unwrap(), 42); } +#[cfg(not(feature = "io-uring"))] #[test] fn new_arbiter_with_tokio() { + use std::sync::{ + atomic::{AtomicBool, Ordering}, + Arc, + }; + let _ = System::new(); let arb = Arbiter::with_tokio_rt(|| { @@ -323,3 +326,32 @@ fn spawn_local() { h(actix_rt::spawn(async { 1 })); }) } + +#[cfg(all(target_os = "linux", feature = "io-uring"))] +#[test] +fn tokio_uring_arbiter() { + let system = System::new(); + let (tx, rx) = std::sync::mpsc::channel(); + + Arbiter::new().spawn(async move { + let handle = actix_rt::spawn(async move { + let f = tokio_uring::fs::File::create("test.txt").await.unwrap(); + let buf = b"Hello World!"; + + let (res, _) = f.write_at(&buf[..], 0).await; + assert!(res.is_ok()); + + f.sync_all().await.unwrap(); + f.close().await.unwrap(); + + std::fs::remove_file("test.txt").unwrap(); + }); + + handle.await.unwrap(); + tx.send(true).unwrap(); + }); + + assert!(rx.recv().unwrap()); + + drop(system); +} diff --git a/actix-server/CHANGES.md b/actix-server/CHANGES.md index 86cde4f077..69f5b08cdc 100644 --- a/actix-server/CHANGES.md +++ b/actix-server/CHANGES.md @@ -3,11 +3,13 @@ ## Unreleased - 2021-xx-xx * Remove `config` module. `ServiceConfig`, `ServiceRuntime` public types are removed due to this change. [#349] * Remove `ServerBuilder::configure` [#349] +* Add `io-uring` feature for enabling async file I/O on linux. [#374] * Server no long listens to SIGHUP signal. It actually did not take any action when receiving SIGHUP, the only thing SIGHUP did was to stop the Server from receiving any future signal, because the `Signals` future stops on the first signal received [#389] +[#374]: https://github.com/actix/actix-net/pull/374 [#349]: https://github.com/actix/actix-net/pull/349 [#389]: https://github.com/actix/actix-net/pull/389 diff --git a/actix-server/Cargo.toml b/actix-server/Cargo.toml index 58471cf910..89e1d4e2cf 100755 --- a/actix-server/Cargo.toml +++ b/actix-server/Cargo.toml @@ -18,6 +18,7 @@ path = "src/lib.rs" [features] default = [] +io-uring = ["actix-rt/io-uring"] [dependencies] actix-rt = { version = "2.0.0", default-features = false } diff --git a/actix-server/src/worker.rs b/actix-server/src/worker.rs index a974522aa3..21f9802733 100644 --- a/actix-server/src/worker.rs +++ b/actix-server/src/worker.rs @@ -280,14 +280,24 @@ impl ServerWorker { let counter_clone = counter.clone(); // every worker runs in it's own arbiter. // use a custom tokio runtime builder to change the settings of runtime. - Arbiter::with_tokio_rt(move || { + #[cfg(all(target_os = "linux", feature = "io-uring"))] + let arbiter = { + // TODO: pass max blocking thread config when tokio-uring enable configuration + // on building runtime. + let _ = config.max_blocking_threads; + Arbiter::new() + }; + + #[cfg(not(all(target_os = "linux", feature = "io-uring")))] + let arbiter = Arbiter::with_tokio_rt(move || { tokio::runtime::Builder::new_current_thread() .enable_all() .max_blocking_threads(config.max_blocking_threads) .build() .unwrap() - }) - .spawn(async move { + }); + + arbiter.spawn(async move { let fut = factories .iter() .enumerate() diff --git a/actix-tls/src/lib.rs b/actix-tls/src/lib.rs index 83e18d5855..dbda883499 100644 --- a/actix-tls/src/lib.rs +++ b/actix-tls/src/lib.rs @@ -5,6 +5,7 @@ #![doc(html_favicon_url = "https://actix.rs/favicon.ico")] #[cfg(feature = "openssl")] +#[allow(unused_extern_crates)] extern crate tls_openssl as openssl; #[cfg(feature = "accept")]