Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

prepare actix-rt v2.0.0 release #262

Merged
merged 4 commits into from
Feb 3, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions actix-router/src/resource.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ enum PatternElement {
}

#[derive(Clone, Debug)]
#[allow(clippy::large_enum_variant)]
enum PatternType {
Static(String),
Prefix(String),
Expand Down
7 changes: 7 additions & 0 deletions actix-rt/CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,13 @@
## Unreleased - 2021-xx-xx


## 2.0.0 - 2021-02-02
* Remove all Arbiter-local storage methods. [#262]
* Re-export `tokio::pin`. [#262]

[#262]: https://github.com/actix/actix-net/pull/262


## 2.0.0-beta.3 - 2021-01-31
* Remove `run_in_tokio`, `attach_to_tokio` and `AsyncSystemRunner`. [#253]
* Return `JoinHandle` from `actix_rt::spawn`. [#253]
Expand Down
5 changes: 3 additions & 2 deletions actix-rt/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
[package]
name = "actix-rt"
version = "2.0.0-beta.3"
version = "2.0.0"
authors = [
"Nikolay Kim <fafhrd91@gmail.com>",
"Rob Ede <robjtede@icloud.com>",
]
description = "Tokio-based single-threaded async runtime for the Actix ecosystem"
keywords = ["network", "framework", "async", "futures"]
keywords = ["async", "futures", "io", "runtime"]
homepage = "https://actix.rs"
repository = "https://github.com/actix/actix-net.git"
documentation = "https://docs.rs/actix-rt"
Expand All @@ -30,3 +30,4 @@ tokio = { version = "1", features = ["rt", "net", "parking_lot", "signal", "sync

[dev-dependencies]
tokio = { version = "1", features = ["full"] }
hyper = { version = "0.14", default-features = false, features = ["server", "tcp", "http1"] }
28 changes: 28 additions & 0 deletions actix-rt/examples/hyper.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
use hyper::service::{make_service_fn, service_fn};
use hyper::{Body, Request, Response, Server};
use std::convert::Infallible;
use std::net::SocketAddr;

async fn handle(_req: Request<Body>) -> Result<Response<Body>, Infallible> {
Ok(Response::new(Body::from("Hello World")))
}

fn main() {
actix_rt::System::with_tokio_rt(|| {
tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.unwrap()
})
.block_on(async {
let make_service =
make_service_fn(|_conn| async { Ok::<_, Infallible>(service_fn(handle)) });

let server =
Server::bind(&SocketAddr::from(([127, 0, 0, 1], 3000))).serve(make_service);

if let Err(e) = server.await {
eprintln!("server error: {}", e);
}
})
}
95 changes: 22 additions & 73 deletions actix-rt/src/arbiter.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
use std::{
any::{Any, TypeId},
cell::RefCell,
collections::HashMap,
fmt,
future::Future,
pin::Pin,
Expand All @@ -14,15 +12,14 @@ use futures_core::ready;
use tokio::{sync::mpsc, task::LocalSet};

use crate::{
runtime::Runtime,
runtime::{default_tokio_runtime, Runtime},
system::{System, SystemCommand},
};

pub(crate) static COUNT: AtomicUsize = AtomicUsize::new(0);

thread_local!(
static HANDLE: RefCell<Option<ArbiterHandle>> = RefCell::new(None);
static STORAGE: RefCell<HashMap<TypeId, Box<dyn Any>>> = RefCell::new(HashMap::new());
);

pub(crate) enum ArbiterCommand {
Expand Down Expand Up @@ -97,16 +94,30 @@ pub struct Arbiter {
}

impl Arbiter {
/// Spawn new Arbiter thread and start its event loop.
/// Spawn a new Arbiter thread and start its event loop.
///
/// # Panics
/// Panics if a [System] is not registered on the current thread.
#[allow(clippy::new_without_default)]
pub fn new() -> Arbiter {
let id = COUNT.fetch_add(1, Ordering::Relaxed);
let system_id = System::current().id();
let name = format!("actix-rt|system:{}|arbiter:{}", system_id, id);
Self::with_tokio_rt(|| {
default_tokio_runtime().expect("Cannot create new Arbiter's Runtime.")
})
}

/// Spawn a new Arbiter using the [Tokio Runtime](tokio-runtime) returned from a closure.
///
/// [tokio-runtime]: tokio::runtime::Runtime
#[doc(hidden)]
pub fn with_tokio_rt<F>(runtime_factory: F) -> Arbiter
where
F: Fn() -> tokio::runtime::Runtime + Send + 'static,
{
let sys = System::current();
let system_id = sys.id();
let arb_id = COUNT.fetch_add(1, Ordering::Relaxed);

let name = format!("actix-rt|system:{}|arbiter:{}", system_id, arb_id);
let (tx, rx) = mpsc::unbounded_channel();

let (ready_tx, ready_rx) = std::sync::mpsc::channel::<()>();
Expand All @@ -116,18 +127,17 @@ impl Arbiter {
.spawn({
let tx = tx.clone();
move || {
let rt = Runtime::new().expect("Cannot create new Arbiter's Runtime.");
let rt = Runtime::from(runtime_factory());
let hnd = ArbiterHandle::new(tx);

System::set_current(sys);

STORAGE.with(|cell| cell.borrow_mut().clear());
HANDLE.with(|cell| *cell.borrow_mut() = Some(hnd.clone()));

// register arbiter
let _ = System::current()
.tx()
.send(SystemCommand::RegisterArbiter(id, hnd));
.send(SystemCommand::RegisterArbiter(arb_id, hnd));

ready_tx.send(()).unwrap();

Expand All @@ -137,7 +147,7 @@ impl Arbiter {
// deregister arbiter
let _ = System::current()
.tx()
.send(SystemCommand::DeregisterArbiter(id));
.send(SystemCommand::DeregisterArbiter(arb_id));
}
})
.unwrap_or_else(|err| {
Expand All @@ -156,7 +166,6 @@ impl Arbiter {
let hnd = ArbiterHandle::new(tx);

HANDLE.with(|cell| *cell.borrow_mut() = Some(hnd.clone()));
STORAGE.with(|cell| cell.borrow_mut().clear());

local.spawn_local(ArbiterRunner { rx });

Expand Down Expand Up @@ -214,58 +223,6 @@ impl Arbiter {
pub fn join(self) -> thread::Result<()> {
self.thread_handle.join()
}

/// Insert item into Arbiter's thread-local storage.
///
/// Overwrites any item of the same type previously inserted.
#[deprecated = "Will be removed in stable v2."]
pub fn set_item<T: 'static>(item: T) {
STORAGE.with(move |cell| cell.borrow_mut().insert(TypeId::of::<T>(), Box::new(item)));
}

/// Check if Arbiter's thread-local storage contains an item type.
#[deprecated = "Will be removed in stable v2."]
pub fn contains_item<T: 'static>() -> bool {
STORAGE.with(move |cell| cell.borrow().contains_key(&TypeId::of::<T>()))
}

/// Call a function with a shared reference to an item in this Arbiter's thread-local storage.
///
/// # Panics
/// Panics if item is not in Arbiter's thread-local item storage.
#[deprecated = "Will be removed in stable v2."]
pub fn get_item<T: 'static, F, R>(mut f: F) -> R
where
F: FnMut(&T) -> R,
{
STORAGE.with(move |cell| {
let st = cell.borrow();

let type_id = TypeId::of::<T>();
let item = st.get(&type_id).and_then(downcast_ref).unwrap();

f(item)
})
}

/// Call a function with a mutable reference to an item in this Arbiter's thread-local storage.
///
/// # Panics
/// Panics if item is not in Arbiter's thread-local item storage.
#[deprecated = "Will be removed in stable v2."]
pub fn get_mut_item<T: 'static, F, R>(mut f: F) -> R
where
F: FnMut(&mut T) -> R,
{
STORAGE.with(move |cell| {
let mut st = cell.borrow_mut();

let type_id = TypeId::of::<T>();
let item = st.get_mut(&type_id).and_then(downcast_mut).unwrap();

f(item)
})
}
}

/// A persistent future that processes [Arbiter] commands.
Expand Down Expand Up @@ -296,11 +253,3 @@ impl Future for ArbiterRunner {
}
}
}

fn downcast_ref<T: 'static>(boxed: &Box<dyn Any>) -> Option<&T> {
boxed.downcast_ref()
}

fn downcast_mut<T: 'static>(boxed: &mut Box<dyn Any>) -> Option<&mut T> {
boxed.downcast_mut()
}
4 changes: 3 additions & 1 deletion actix-rt/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
//!
//! The disadvantage is that idle threads will not steal work from very busy, stuck or otherwise
//! backlogged threads. Tasks that are disproportionately expensive should be offloaded to the
//! blocking thread-pool using [`task::spawn_blocking`].
//! blocking task thread-pool using [`task::spawn_blocking`].
//!
//! # Examples
//! ```
Expand Down Expand Up @@ -56,6 +56,8 @@ pub use self::arbiter::{Arbiter, ArbiterHandle};
pub use self::runtime::Runtime;
pub use self::system::{System, SystemRunner};

pub use tokio::pin;

pub mod signal {
//! Asynchronous signal handling (Tokio re-exports).

Expand Down
25 changes: 19 additions & 6 deletions actix-rt/src/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::{future::Future, io};

use tokio::task::{JoinHandle, LocalSet};

/// A single-threaded runtime based on Tokio's "current thread" runtime.
/// A Tokio-based runtime proxy.
///
/// All spawned futures will be executed on the current thread. Therefore, there is no `Send` bound
/// on submitted futures.
Expand All @@ -12,14 +12,18 @@ pub struct Runtime {
rt: tokio::runtime::Runtime,
}

pub(crate) fn default_tokio_runtime() -> io::Result<tokio::runtime::Runtime> {
tokio::runtime::Builder::new_current_thread()
.enable_io()
.enable_time()
.build()
}

impl Runtime {
/// Returns a new runtime initialized with default configuration values.
#[allow(clippy::new_ret_no_self)]
pub fn new() -> io::Result<Runtime> {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_io()
.enable_time()
.build()?;
pub fn new() -> io::Result<Self> {
let rt = default_tokio_runtime()?;

Ok(Runtime {
rt,
Expand Down Expand Up @@ -81,3 +85,12 @@ impl Runtime {
self.local.block_on(&self.rt, f)
}
}

impl From<tokio::runtime::Runtime> for Runtime {
fn from(rt: tokio::runtime::Runtime) -> Self {
Self {
local: LocalSet::new(),
rt,
}
}
}
18 changes: 16 additions & 2 deletions actix-rt/src/system.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use std::{
use futures_core::ready;
use tokio::sync::{mpsc, oneshot};

use crate::{arbiter::ArbiterHandle, Arbiter, Runtime};
use crate::{arbiter::ArbiterHandle, runtime::default_tokio_runtime, Arbiter, Runtime};

static SYSTEM_COUNT: AtomicUsize = AtomicUsize::new(0);

Expand All @@ -36,10 +36,24 @@ impl System {
/// Panics if underlying Tokio runtime can not be created.
#[allow(clippy::new_ret_no_self)]
pub fn new() -> SystemRunner {
Self::with_tokio_rt(|| {
default_tokio_runtime()
.expect("Default Actix (Tokio) runtime could not be created.")
})
}

/// Create a new System using the [Tokio Runtime](tokio-runtime) returned from a closure.
///
/// [tokio-runtime]: tokio::runtime::Runtime
#[doc(hidden)]
pub fn with_tokio_rt<F>(runtime_factory: F) -> SystemRunner
where
F: Fn() -> tokio::runtime::Runtime,
{
let (stop_tx, stop_rx) = oneshot::channel();
let (sys_tx, sys_rx) = mpsc::unbounded_channel();

let rt = Runtime::new().expect("Actix (Tokio) runtime could not be created.");
let rt = Runtime::from(runtime_factory());
let sys_arbiter = Arbiter::in_new_system(rt.local_set());
let system = System::construct(sys_tx, sys_arbiter.clone());

Expand Down
Loading