Skip to content

Commit

Permalink
Server: run after await
Browse files Browse the repository at this point in the history
  • Loading branch information
aliemjay committed Dec 3, 2021
1 parent 6335921 commit c33e3c3
Show file tree
Hide file tree
Showing 3 changed files with 95 additions and 98 deletions.
4 changes: 2 additions & 2 deletions actix-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ log = "0.4"
mio = { version = "0.8", features = ["os-poll", "net"] }
num_cpus = "1.13"
socket2 = "0.4.2"
tokio = { version = "1.5.1", features = ["sync"] }
tokio = { version = "1.5.1", features = ["sync", "macros"] }

# runtime for io-uring feature
tokio-uring = { version = "0.1", optional = true }
Expand All @@ -42,5 +42,5 @@ actix-rt = "2.4.0"

bytes = "1"
env_logger = "0.9"
futures-util = { version = "0.3.7", default-features = false, features = ["sink"] }
futures-util = { version = "0.3.7", default-features = false, features = ["sink", "async-await-macro"] }
tokio = { version = "1.5.1", features = ["io-util", "rt-multi-thread", "macros"] }
152 changes: 65 additions & 87 deletions actix-server/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,7 @@ use std::{
use actix_rt::{time::sleep, System};
use futures_core::future::BoxFuture;
use log::{error, info};
use tokio::sync::{
mpsc::{UnboundedReceiver, UnboundedSender},
oneshot,
};
use tokio::sync::{mpsc::UnboundedReceiver, oneshot};

use crate::{
accept::Accept,
Expand Down Expand Up @@ -120,7 +117,10 @@ pub(crate) enum ServerCommand {
/// }
/// ```
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct Server(Result<ServerInner, Option<io::Error>>);
pub struct Server {
handle: ServerHandle,
fut: BoxFuture<'static, io::Result<()>>,
}

impl Server {
/// Create server build.
Expand All @@ -129,62 +129,26 @@ impl Server {
}

pub(crate) fn new(builder: ServerBuilder) -> Self {
Server(ServerInner::new(builder).map_err(Some))
Server {
handle: ServerHandle::new(builder.cmd_tx.clone()),
fut: Box::pin(ServerInner::run(builder)),
}
}

/// Get a handle for ServerFuture that can be used to change state of actix server.
///
/// See [ServerHandle](ServerHandle) for usage.
pub fn handle(&self) -> ServerHandle {
match &self.0 {
Ok(inner) => ServerHandle::new(inner.cmd_tx.clone()),
Err(err) => {
// TODO: i don't think this is the best way to handle server startup fail
panic!(
"server handle can not be obtained because server failed to start up: {}",
err.as_ref().unwrap()
);
}
}
self.handle.clone()
}
}

impl Future for Server {
type Output = io::Result<()>;

#[inline]
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match &mut self.as_mut().get_mut().0 {
Err(err) => Poll::Ready(Err(err
.take()
.expect("Server future cannot be polled after error"))),

Ok(inner) => {
// poll Signals
if let Some(ref mut signals) = inner.signals {
if let Poll::Ready(signal) = Pin::new(signals).poll(cx) {
inner.stop_task = inner.handle_signal(signal);
// drop signals listener
inner.signals = None;
}
}

// handle stop tasks and eager drain command channel
loop {
if let Some(ref mut fut) = inner.stop_task {
// only resolve stop task and exit
return fut.as_mut().poll(cx).map(|_| Ok(()));
}

match Pin::new(&mut inner.cmd_rx).poll_recv(cx) {
Poll::Ready(Some(cmd)) => {
// if stop task is required, set it and loop
inner.stop_task = inner.handle_cmd(cmd);
}
_ => return Poll::Pending,
}
}
}
}
Pin::new(&mut self.into_inner().fut).poll(cx)
}
}

Expand All @@ -193,15 +157,35 @@ pub struct ServerInner {
worker_config: ServerWorkerConfig,
services: Vec<Box<dyn InternalServiceFactory>>,
exit: bool,
cmd_tx: UnboundedSender<ServerCommand>,
cmd_rx: UnboundedReceiver<ServerCommand>,
signals: Option<Signals>,
waker_queue: WakerQueue,
stop_task: Option<BoxFuture<'static, ()>>,
stopped: bool,
}

impl ServerInner {
fn new(mut builder: ServerBuilder) -> io::Result<Self> {
async fn run(builder: ServerBuilder) -> io::Result<()> {
let (mut this, mut cmd_rx, mut signal_fut) = Self::run_sync(builder)?;
let listen_to_signals = signal_fut.is_some();

while !this.stopped {
tokio::select! {
signal = async {
signal_fut.as_mut().unwrap().await
}, if listen_to_signals => {
this.handle_signal(signal).await;
},
Some(cmd) = cmd_rx.recv() => {
this.handle_cmd(cmd).await;
},
else => break,
};
}

Ok(())
}

fn run_sync(
mut builder: ServerBuilder,
) -> io::Result<(Self, UnboundedReceiver<ServerCommand>, Option<Signals>)> {
let sockets = mem::take(&mut builder.sockets)
.into_iter()
.map(|t| (t.0, t.2))
Expand Down Expand Up @@ -229,66 +213,59 @@ impl ServerInner {
let (waker_queue, worker_handles) = Accept::start(sockets, &builder)?;

// construct OS signals listener future
let signals = (builder.listen_os_signals).then(Signals::new);
let signal_fut = (builder.listen_os_signals).then(Signals::new);

Ok(ServerInner {
cmd_tx: builder.cmd_tx.clone(),
cmd_rx: builder.cmd_rx,
signals,
let server = ServerInner {
waker_queue,
worker_handles,
worker_config: builder.worker_config,
services: builder.factories,
exit: builder.exit,
stop_task: None,
})
stopped: false,
};

Ok((server, builder.cmd_rx, signal_fut))
}

fn handle_cmd(&mut self, item: ServerCommand) -> Option<BoxFuture<'static, ()>> {
async fn handle_cmd(&mut self, item: ServerCommand) {
match item {
ServerCommand::Pause(tx) => {
self.waker_queue.wake(WakerInterest::Pause);
let _ = tx.send(());
None
}

ServerCommand::Resume(tx) => {
self.waker_queue.wake(WakerInterest::Resume);
let _ = tx.send(());
None
}

ServerCommand::Stop {
graceful,
completion,
} => {
let exit = self.exit;
self.stopped = true;

// stop accept thread
self.waker_queue.wake(WakerInterest::Stop);

// stop workers
let workers_stop = self
.worker_handles
.iter()
.map(|worker| worker.stop(graceful))
.collect::<Vec<_>>();

Some(Box::pin(async move {
if graceful {
// wait for all workers to shut down
let _ = join_all(workers_stop).await;
}
if graceful {
// wait for all workers to shut down
let workers_stop = self
.worker_handles
.iter()
.map(|worker| worker.stop(graceful))
.collect::<Vec<_>>();
let _ = join_all(workers_stop).await;
}

if let Some(tx) = completion {
let _ = tx.send(());
}
if let Some(tx) = completion {
let _ = tx.send(());
}

if exit {
sleep(Duration::from_millis(300)).await;
System::try_current().as_ref().map(System::stop);
}
}))
if self.exit {
sleep(Duration::from_millis(300)).await;
System::try_current().as_ref().map(System::stop);
}
}

ServerCommand::WorkerFaulted(idx) => {
Expand Down Expand Up @@ -321,13 +298,11 @@ impl ServerInner {

Err(err) => error!("can not restart worker {}: {}", idx, err),
};

None
}
}
}

fn handle_signal(&mut self, signal: SignalKind) -> Option<BoxFuture<'static, ()>> {
async fn handle_signal(&mut self, signal: SignalKind) {
match signal {
SignalKind::Int => {
info!("SIGINT received; starting forced shutdown");
Expand All @@ -336,6 +311,7 @@ impl ServerInner {
graceful: false,
completion: None,
})
.await
}

SignalKind::Term => {
Expand All @@ -345,6 +321,7 @@ impl ServerInner {
graceful: true,
completion: None,
})
.await
}

SignalKind::Quit => {
Expand All @@ -354,6 +331,7 @@ impl ServerInner {
graceful: false,
completion: None,
})
.await
}
}
}
Expand Down
37 changes: 28 additions & 9 deletions actix-server/tests/test_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -487,27 +487,46 @@ async fn worker_restart() {
}

#[test]
#[should_panic]
fn no_runtime() {
// test set up in a way that would prevent time out if support for runtime-less init was added
fn no_runtime_on_init() {
use std::{thread::sleep, time::Duration};

let addr = unused_addr();
let counter = Arc::new(AtomicUsize::new(0));

let srv = Server::build()
.workers(1)
let mut srv = Server::build()
.workers(2)
.disable_signals()
.bind("test", addr, move || {
fn_service(|_| async { Ok::<_, ()>(()) })
.bind("test", addr, {
let counter = counter.clone();
move || {
counter.fetch_add(1, Ordering::SeqCst);
fn_service(|_| async { Ok::<_, ()>(()) })
}
})
.unwrap()
.run();

fn is_send<T: Send>(_: &T) {}
is_send(&srv);
is_send(&srv.handle());

sleep(Duration::from_millis(1_000));
assert_eq!(counter.load(Ordering::SeqCst), 0);

let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();

let _ = srv.handle().stop(true);
rt.block_on(async move {
let _ = futures_util::poll!(&mut srv);

// available after the first poll
sleep(Duration::from_millis(500));
assert_eq!(counter.load(Ordering::SeqCst), 2);

rt.block_on(async { srv.await }).unwrap();
let _ = srv.handle().stop(true);
srv.await
})
.unwrap();
}

0 comments on commit c33e3c3

Please sign in to comment.