Skip to content

Commit

Permalink
Merge pull request #836 from Keruspe/multitask
Browse files Browse the repository at this point in the history
  • Loading branch information
dignifiedquire authored Jul 26, 2020
2 parents 820acc1 + abc2929 commit f9c8974
Show file tree
Hide file tree
Showing 17 changed files with 139 additions and 46 deletions.
6 changes: 6 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,12 @@ jobs:
command: check
args: --features attributes

- name: build unstable only
uses: actions-rs/cargo@v1
with:
command: build
args: --no-default-features --features unstable

- name: tests
uses: actions-rs/cargo@v1
with:
Expand Down
18 changes: 15 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,15 @@ rustdoc-args = ["--cfg", "feature=\"docs\""]
[features]
default = [
"std",
"async-executor",
"async-io",
"async-task",
"blocking",
"futures-lite",
"kv-log-macro",
"log",
"num_cpus",
"pin-project-lite",
"smol",
]
docs = ["attributes", "unstable", "default"]
unstable = [
Expand All @@ -54,7 +57,7 @@ alloc = [
"futures-core/alloc",
"pin-project-lite",
]
tokio02 = ["smol/tokio02"]
tokio02 = ["tokio"]

[dependencies]
async-attributes = { version = "1.1.1", optional = true }
Expand All @@ -77,7 +80,10 @@ futures-timer = { version = "3.0.2", optional = true }
surf = { version = "1.0.3", optional = true }

[target.'cfg(not(target_os = "unknown"))'.dependencies]
smol = { version = "0.1.17", optional = true }
async-executor = { version = "0.1.1", features = ["async-io"], optional = true }
async-io = { version = "0.1.5", optional = true }
blocking = { version = "0.5.0", optional = true }
futures-lite = { version = "0.1.8", optional = true }

[target.'cfg(target_arch = "wasm32")'.dependencies]
futures-timer = { version = "3.0.2", optional = true, features = ["wasm-bindgen"] }
Expand All @@ -87,6 +93,12 @@ futures-channel = { version = "0.3.4", optional = true }
[target.'cfg(target_arch = "wasm32")'.dev-dependencies]
wasm-bindgen-test = "0.3.10"

[dependencies.tokio]
version = "0.2"
default-features = false
features = ["rt-threaded"]
optional = true

[dev-dependencies]
femme = "1.3.0"
rand = "0.7.3"
Expand Down
2 changes: 1 addition & 1 deletion src/fs/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ impl Drop for File {
// non-blocking fashion, but our only other option here is losing data remaining in the
// write cache. Good task schedulers should be resilient to occasional blocking hiccups in
// file destructors so we don't expect this to be a common problem in practice.
let _ = smol::block_on(self.flush());
let _ = futures_lite::future::block_on(self.flush());
}
}

Expand Down
8 changes: 4 additions & 4 deletions src/net/tcp/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::future::Future;
use std::net::SocketAddr;
use std::pin::Pin;

use smol::Async;
use async_io::Async;

use crate::io;
use crate::net::{TcpStream, ToSocketAddrs};
Expand Down Expand Up @@ -81,7 +81,7 @@ impl TcpListener {
let addrs = addrs.to_socket_addrs().await?;

for addr in addrs {
match Async::<std::net::TcpListener>::bind(&addr) {
match Async::<std::net::TcpListener>::bind(addr) {
Ok(listener) => {
return Ok(TcpListener { watcher: listener });
}
Expand Down Expand Up @@ -227,7 +227,7 @@ cfg_unix! {

impl IntoRawFd for TcpListener {
fn into_raw_fd(self) -> RawFd {
self.watcher.into_raw_fd()
self.watcher.into_inner().unwrap().into_raw_fd()
}
}
}
Expand All @@ -251,7 +251,7 @@ cfg_windows! {

impl IntoRawSocket for TcpListener {
fn into_raw_socket(self) -> RawSocket {
self.watcher.into_raw_socket()
self.watcher.into_inner().unwrap().into_raw_socket()
}
}
}
4 changes: 2 additions & 2 deletions src/net/tcp/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::io::{IoSlice, IoSliceMut};
use std::net::SocketAddr;
use std::pin::Pin;

use smol::Async;
use async_io::Async;

use crate::io::{self, Read, Write};
use crate::net::ToSocketAddrs;
Expand Down Expand Up @@ -77,7 +77,7 @@ impl TcpStream {
let addrs = addrs.to_socket_addrs().await?;

for addr in addrs {
match Async::<std::net::TcpStream>::connect(&addr).await {
match Async::<std::net::TcpStream>::connect(addr).await {
Ok(stream) => {
return Ok(TcpStream {
watcher: Arc::new(stream),
Expand Down
8 changes: 4 additions & 4 deletions src/net/udp/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::io;
use std::net::SocketAddr;
use std::net::{Ipv4Addr, Ipv6Addr};

use smol::Async;
use async_io::Async;

use crate::net::ToSocketAddrs;
use crate::utils::Context as _;
Expand Down Expand Up @@ -74,7 +74,7 @@ impl UdpSocket {
let addrs = addrs.to_socket_addrs().await?;

for addr in addrs {
match Async::<std::net::UdpSocket>::bind(&addr) {
match Async::<std::net::UdpSocket>::bind(addr) {
Ok(socket) => {
return Ok(UdpSocket { watcher: socket });
}
Expand Down Expand Up @@ -506,7 +506,7 @@ cfg_unix! {

impl IntoRawFd for UdpSocket {
fn into_raw_fd(self) -> RawFd {
self.watcher.into_raw_fd()
self.watcher.into_inner().unwrap().into_raw_fd()
}
}
}
Expand All @@ -530,7 +530,7 @@ cfg_windows! {

impl IntoRawSocket for UdpSocket {
fn into_raw_socket(self) -> RawSocket {
self.watcher.into_raw_socket()
self.watcher.into_inner().unwrap().into_raw_socket()
}
}
}
4 changes: 2 additions & 2 deletions src/os/unix/net/datagram.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::fmt;
use std::net::Shutdown;
use std::os::unix::net::UnixDatagram as StdUnixDatagram;

use smol::Async;
use async_io::Async;

use super::SocketAddr;
use crate::io;
Expand Down Expand Up @@ -335,6 +335,6 @@ impl FromRawFd for UnixDatagram {

impl IntoRawFd for UnixDatagram {
fn into_raw_fd(self) -> RawFd {
self.watcher.into_raw_fd()
self.watcher.into_inner().unwrap().into_raw_fd()
}
}
4 changes: 2 additions & 2 deletions src/os/unix/net/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::future::Future;
use std::os::unix::net::UnixListener as StdUnixListener;
use std::pin::Pin;

use smol::Async;
use async_io::Async;

use super::SocketAddr;
use super::UnixStream;
Expand Down Expand Up @@ -217,6 +217,6 @@ impl FromRawFd for UnixListener {

impl IntoRawFd for UnixListener {
fn into_raw_fd(self) -> RawFd {
self.watcher.into_raw_fd()
self.watcher.into_inner().unwrap().into_raw_fd()
}
}
2 changes: 1 addition & 1 deletion src/os/unix/net/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::net::Shutdown;
use std::os::unix::net::UnixStream as StdUnixStream;
use std::pin::Pin;

use smol::Async;
use async_io::Async;

use super::SocketAddr;
use crate::io::{self, Read, Write};
Expand Down
1 change: 1 addition & 0 deletions src/os/windows/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,6 @@ cfg_std! {
}

cfg_unstable! {
#[cfg(feature = "default")]
pub mod fs;
}
2 changes: 1 addition & 1 deletion src/rt/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ pub static RUNTIME: Lazy<Runtime> = Lazy::new(|| {
for _ in 0..thread_count {
thread::Builder::new()
.name(thread_name.clone())
.spawn(|| crate::task::block_on(future::pending::<()>()))
.spawn(|| crate::task::executor::run_global(future::pending::<()>()))
.expect("cannot start a runtime thread");
}
Runtime {}
Expand Down
16 changes: 8 additions & 8 deletions src/task/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::task::{Context, Poll};
use pin_project_lite::pin_project;

use crate::io;
use crate::task::{JoinHandle, Task, TaskLocalsWrapper};
use crate::task::{self, JoinHandle, Task, TaskLocalsWrapper};

/// Task builder that configures the settings of a new task.
#[derive(Debug, Default)]
Expand Down Expand Up @@ -61,9 +61,9 @@ impl Builder {
});

let task = wrapped.tag.task().clone();
let smol_task = smol::Task::spawn(wrapped).into();
let handle = task::executor::spawn(wrapped);

Ok(JoinHandle::new(smol_task, task))
Ok(JoinHandle::new(handle, task))
}

/// Spawns a task locally with the configured settings.
Expand All @@ -81,9 +81,9 @@ impl Builder {
});

let task = wrapped.tag.task().clone();
let smol_task = smol::Task::local(wrapped).into();
let handle = task::executor::local(wrapped);

Ok(JoinHandle::new(smol_task, task))
Ok(JoinHandle::new(handle, task))
}

/// Spawns a task locally with the configured settings.
Expand Down Expand Up @@ -166,10 +166,10 @@ impl Builder {
unsafe {
TaskLocalsWrapper::set_current(&wrapped.tag, || {
let res = if should_run {
// The first call should use run.
smol::run(wrapped)
// The first call should run the executor
task::executor::run(wrapped)
} else {
smol::block_on(wrapped)
futures_lite::future::block_on(wrapped)
};
num_nested_blocking.replace(num_nested_blocking.get() - 1);
res
Expand Down
72 changes: 72 additions & 0 deletions src/task/executor.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
use std::cell::RefCell;
use std::future::Future;

static GLOBAL_EXECUTOR: once_cell::sync::Lazy<async_executor::Executor> = once_cell::sync::Lazy::new(async_executor::Executor::new);

thread_local! {
static EXECUTOR: RefCell<async_executor::LocalExecutor> = RefCell::new(async_executor::LocalExecutor::new());
}

pub(crate) fn spawn<F, T>(future: F) -> async_executor::Task<T>
where
F: Future<Output = T> + Send + 'static,
T: Send + 'static,
{
GLOBAL_EXECUTOR.spawn(future)
}

#[cfg(feature = "unstable")]
pub(crate) fn local<F, T>(future: F) -> async_executor::Task<T>
where
F: Future<Output = T> + 'static,
T: 'static,
{
EXECUTOR.with(|executor| executor.borrow().spawn(future))
}

pub(crate) fn run<F, T>(future: F) -> T
where
F: Future<Output = T>,
{
EXECUTOR.with(|executor| enter(|| GLOBAL_EXECUTOR.enter(|| executor.borrow().run(future))))
}

pub(crate) fn run_global<F, T>(future: F) -> T
where
F: Future<Output = T>,
{
enter(|| GLOBAL_EXECUTOR.run(future))
}

/// Enters the tokio context if the `tokio` feature is enabled.
fn enter<T>(f: impl FnOnce() -> T) -> T {
#[cfg(not(feature = "tokio02"))]
return f();

#[cfg(feature = "tokio02")]
{
use std::cell::Cell;
use tokio::runtime::Runtime;

thread_local! {
/// The level of nested `enter` calls we are in, to ensure that the outermost always
/// has a runtime spawned.
static NESTING: Cell<usize> = Cell::new(0);
}

/// The global tokio runtime.
static RT: once_cell::sync::Lazy<Runtime> = once_cell::sync::Lazy::new(|| Runtime::new().expect("cannot initialize tokio"));

NESTING.with(|nesting| {
let res = if nesting.get() == 0 {
nesting.replace(1);
RT.enter(f)
} else {
nesting.replace(nesting.get() + 1);
f()
};
nesting.replace(nesting.get() - 1);
res
})
}
}
21 changes: 12 additions & 9 deletions src/task/join_handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ pub struct JoinHandle<T> {
}

#[cfg(not(target_os = "unknown"))]
type InnerHandle<T> = async_task::JoinHandle<T, ()>;
type InnerHandle<T> = async_executor::Task<T>;
#[cfg(target_arch = "wasm32")]
type InnerHandle<T> = futures_channel::oneshot::Receiver<T>;

Expand Down Expand Up @@ -54,8 +54,7 @@ impl<T> JoinHandle<T> {
#[cfg(not(target_os = "unknown"))]
pub async fn cancel(mut self) -> Option<T> {
let handle = self.handle.take().unwrap();
handle.cancel();
handle.await
handle.cancel().await
}

/// Cancel this task.
Expand All @@ -67,15 +66,19 @@ impl<T> JoinHandle<T> {
}
}

#[cfg(not(target_os = "unknown"))]
impl<T> Drop for JoinHandle<T> {
fn drop(&mut self) {
if let Some(handle) = self.handle.take() {
handle.detach();
}
}
}

impl<T> Future for JoinHandle<T> {
type Output = T;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match Pin::new(&mut self.handle.as_mut().unwrap()).poll(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(output) => {
Poll::Ready(output.expect("cannot await the result of a panicked task"))
}
}
Pin::new(&mut self.handle.as_mut().unwrap()).poll(cx)
}
}
2 changes: 2 additions & 0 deletions src/task/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,8 @@ cfg_default! {
mod block_on;
mod builder;
mod current;
#[cfg(not(target_os = "unknown"))]
pub(crate) mod executor;
mod join_handle;
mod sleep;
#[cfg(not(target_os = "unknown"))]
Expand Down
Loading

0 comments on commit f9c8974

Please sign in to comment.