Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Server: run after await #425

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions actix-server/CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
# Changes

## Unreleased - 2021-xx-xx
* `Server` now runs only after awaiting it. [#425]

[#425]: https://github.com/actix/actix-net/pull/425


## 2.0.0-beta.9 - 2021-11-15
Expand Down
4 changes: 2 additions & 2 deletions actix-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ log = "0.4"
mio = { version = "0.8", features = ["os-poll", "net"] }
num_cpus = "1.13"
socket2 = "0.4.2"
tokio = { version = "1.5.1", features = ["sync"] }
tokio = { version = "1.5.1", features = ["sync", "macros"] }

# runtime for io-uring feature
tokio-uring = { version = "0.1", optional = true }
Expand All @@ -42,5 +42,5 @@ actix-rt = "2.4.0"

bytes = "1"
env_logger = "0.9"
futures-util = { version = "0.3.7", default-features = false, features = ["sink"] }
futures-util = { version = "0.3.7", default-features = false, features = ["sink", "async-await-macro"] }
tokio = { version = "1.5.1", features = ["io-util", "rt-multi-thread", "macros"] }
221 changes: 98 additions & 123 deletions actix-server/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,7 @@ use std::{
use actix_rt::{time::sleep, System};
use futures_core::future::BoxFuture;
use log::{error, info};
use tokio::sync::{
mpsc::{UnboundedReceiver, UnboundedSender},
oneshot,
};
use tokio::sync::{mpsc::UnboundedReceiver, oneshot};

use crate::{
accept::Accept,
Expand Down Expand Up @@ -60,8 +57,8 @@ pub(crate) enum ServerCommand {
/// Creates a worker per CPU core (or the number specified in [`ServerBuilder::workers`]) and
/// distributes connections with a round-robin strategy.
///
/// The [Server] must be awaited to process stop commands and listen for OS signals. It will resolve
/// when the server has fully shut down.
/// The [Server] must be awaited in order to run.
/// It will resolve when the server has fully shut down.
///
/// # Shutdown Signals
/// On UNIX systems, `SIGQUIT` will start a graceful shutdown and `SIGTERM` or `SIGINT` will start a
Expand Down Expand Up @@ -120,9 +117,9 @@ pub(crate) enum ServerCommand {
/// }
/// ```
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub enum Server {
Server(ServerInner),
Error(Option<io::Error>),
pub struct Server {
handle: ServerHandle,
fut: BoxFuture<'static, io::Result<()>>,
}

impl Server {
Expand All @@ -131,7 +128,64 @@ impl Server {
ServerBuilder::default()
}

pub(crate) fn new(mut builder: ServerBuilder) -> Self {
pub(crate) fn new(builder: ServerBuilder) -> Self {
Server {
handle: ServerHandle::new(builder.cmd_tx.clone()),
fut: Box::pin(ServerInner::run(builder)),
}
}

/// Get a handle for ServerFuture that can be used to change state of actix server.
///
/// See [ServerHandle](ServerHandle) for usage.
pub fn handle(&self) -> ServerHandle {
self.handle.clone()
}
}

impl Future for Server {
type Output = io::Result<()>;

#[inline]
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
Pin::new(&mut Pin::into_inner(self).fut).poll(cx)
}
}

pub struct ServerInner {
worker_handles: Vec<WorkerHandleServer>,
worker_config: ServerWorkerConfig,
services: Vec<Box<dyn InternalServiceFactory>>,
exit: bool,
waker_queue: WakerQueue,
stopped: bool,
}

impl ServerInner {
async fn run(builder: ServerBuilder) -> io::Result<()> {
let (mut this, mut cmd_rx, mut signal_fut) = Self::run_sync(builder)?;
let listen_to_signals = signal_fut.is_some();

while !this.stopped {
tokio::select! {
signal = async {
signal_fut.as_mut().unwrap().await
}, if listen_to_signals => {
this.handle_signal(signal).await;
},
Some(cmd) = cmd_rx.recv() => {
this.handle_cmd(cmd).await;
},
else => break,
};
}

Ok(())
}

fn run_sync(
mut builder: ServerBuilder,
) -> io::Result<(Self, UnboundedReceiver<ServerCommand>, Option<Signals>)> {
let sockets = mem::take(&mut builder.sockets)
.into_iter()
.map(|t| (t.0, t.2))
Expand All @@ -156,142 +210,62 @@ impl Server {
);
}

match Accept::start(sockets, &builder) {
Ok((waker_queue, worker_handles)) => {
// construct OS signals listener future
let signals = (builder.listen_os_signals).then(Signals::new);

Self::Server(ServerInner {
cmd_tx: builder.cmd_tx.clone(),
cmd_rx: builder.cmd_rx,
signals,
waker_queue,
worker_handles,
worker_config: builder.worker_config,
services: builder.factories,
exit: builder.exit,
stop_task: None,
})
}
let (waker_queue, worker_handles) = Accept::start(sockets, &builder)?;

Err(err) => Self::Error(Some(err)),
}
}
// construct OS signals listener future
let signal_fut = (builder.listen_os_signals).then(Signals::new);

/// Get a handle for ServerFuture that can be used to change state of actix server.
///
/// See [ServerHandle](ServerHandle) for usage.
pub fn handle(&self) -> ServerHandle {
match self {
Server::Server(inner) => ServerHandle::new(inner.cmd_tx.clone()),
Server::Error(err) => {
// TODO: i don't think this is the best way to handle server startup fail
panic!(
"server handle can not be obtained because server failed to start up: {}",
err.as_ref().unwrap()
);
}
}
}
}

impl Future for Server {
type Output = io::Result<()>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match self.as_mut().get_mut() {
Self::Error(err) => Poll::Ready(Err(err
.take()
.expect("Server future cannot be polled after error"))),

Self::Server(inner) => {
// poll Signals
if let Some(ref mut signals) = inner.signals {
if let Poll::Ready(signal) = Pin::new(signals).poll(cx) {
inner.stop_task = inner.handle_signal(signal);
// drop signals listener
inner.signals = None;
}
}
let server = ServerInner {
waker_queue,
worker_handles,
worker_config: builder.worker_config,
services: builder.factories,
exit: builder.exit,
stopped: false,
};

// handle stop tasks and eager drain command channel
loop {
if let Some(ref mut fut) = inner.stop_task {
// only resolve stop task and exit
return fut.as_mut().poll(cx).map(|_| Ok(()));
}

match Pin::new(&mut inner.cmd_rx).poll_recv(cx) {
Poll::Ready(Some(cmd)) => {
// if stop task is required, set it and loop
inner.stop_task = inner.handle_cmd(cmd);
}
_ => return Poll::Pending,
}
}
}
}
Ok((server, builder.cmd_rx, signal_fut))
}
}

pub struct ServerInner {
worker_handles: Vec<WorkerHandleServer>,
worker_config: ServerWorkerConfig,
services: Vec<Box<dyn InternalServiceFactory>>,
exit: bool,
cmd_tx: UnboundedSender<ServerCommand>,
cmd_rx: UnboundedReceiver<ServerCommand>,
signals: Option<Signals>,
waker_queue: WakerQueue,
stop_task: Option<BoxFuture<'static, ()>>,
}

impl ServerInner {
fn handle_cmd(&mut self, item: ServerCommand) -> Option<BoxFuture<'static, ()>> {
async fn handle_cmd(&mut self, item: ServerCommand) {
match item {
ServerCommand::Pause(tx) => {
self.waker_queue.wake(WakerInterest::Pause);
let _ = tx.send(());
None
}

ServerCommand::Resume(tx) => {
self.waker_queue.wake(WakerInterest::Resume);
let _ = tx.send(());
None
}

ServerCommand::Stop {
graceful,
completion,
} => {
let exit = self.exit;
self.stopped = true;

// stop accept thread
self.waker_queue.wake(WakerInterest::Stop);

// stop workers
let workers_stop = self
.worker_handles
.iter()
.map(|worker| worker.stop(graceful))
.collect::<Vec<_>>();

Some(Box::pin(async move {
if graceful {
// wait for all workers to shut down
let _ = join_all(workers_stop).await;
}
if graceful {
// wait for all workers to shut down
let workers_stop = self
.worker_handles
.iter()
.map(|worker| worker.stop(graceful))
.collect::<Vec<_>>();
let _ = join_all(workers_stop).await;
}

if let Some(tx) = completion {
let _ = tx.send(());
}
if let Some(tx) = completion {
let _ = tx.send(());
}

if exit {
sleep(Duration::from_millis(300)).await;
System::try_current().as_ref().map(System::stop);
}
}))
if self.exit {
sleep(Duration::from_millis(300)).await;
System::try_current().as_ref().map(System::stop);
}
}

ServerCommand::WorkerFaulted(idx) => {
Expand Down Expand Up @@ -324,13 +298,11 @@ impl ServerInner {

Err(err) => error!("can not restart worker {}: {}", idx, err),
};

None
}
}
}

fn handle_signal(&mut self, signal: SignalKind) -> Option<BoxFuture<'static, ()>> {
async fn handle_signal(&mut self, signal: SignalKind) {
match signal {
SignalKind::Int => {
info!("SIGINT received; starting forced shutdown");
Expand All @@ -339,6 +311,7 @@ impl ServerInner {
graceful: false,
completion: None,
})
.await
}

SignalKind::Term => {
Expand All @@ -348,6 +321,7 @@ impl ServerInner {
graceful: true,
completion: None,
})
.await
}

SignalKind::Quit => {
Expand All @@ -357,6 +331,7 @@ impl ServerInner {
graceful: false,
completion: None,
})
.await
}
}
}
Expand Down
37 changes: 28 additions & 9 deletions actix-server/tests/test_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -487,27 +487,46 @@ async fn worker_restart() {
}

#[test]
#[should_panic]
fn no_runtime() {
// test set up in a way that would prevent time out if support for runtime-less init was added
fn no_runtime_on_init() {
use std::{thread::sleep, time::Duration};

let addr = unused_addr();
let counter = Arc::new(AtomicUsize::new(0));

let srv = Server::build()
.workers(1)
let mut srv = Server::build()
.workers(2)
.disable_signals()
.bind("test", addr, move || {
fn_service(|_| async { Ok::<_, ()>(()) })
.bind("test", addr, {
let counter = counter.clone();
move || {
counter.fetch_add(1, Ordering::SeqCst);
fn_service(|_| async { Ok::<_, ()>(()) })
}
})
.unwrap()
.run();

fn is_send<T: Send>(_: &T) {}
is_send(&srv);
is_send(&srv.handle());

sleep(Duration::from_millis(1_000));
assert_eq!(counter.load(Ordering::SeqCst), 0);

let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();

let _ = srv.handle().stop(true);
rt.block_on(async move {
let _ = futures_util::poll!(&mut srv);

// available after the first poll
sleep(Duration::from_millis(500));
assert_eq!(counter.load(Ordering::SeqCst), 2);

rt.block_on(async { srv.await }).unwrap();
let _ = srv.handle().stop(true);
srv.await
})
.unwrap();
}