From 63359210855876a9d2b39dfe9a9e6f32a518e3eb Mon Sep 17 00:00:00 2001 From: Ali MJ Al-Nasrawy Date: Thu, 2 Dec 2021 19:50:17 +0300 Subject: [PATCH 1/3] Server: hide internal structure --- actix-server/src/server.rs | 107 ++++++++++++++++++------------------- 1 file changed, 52 insertions(+), 55 deletions(-) diff --git a/actix-server/src/server.rs b/actix-server/src/server.rs index e79c0aef7b..bc0c956108 100644 --- a/actix-server/src/server.rs +++ b/actix-server/src/server.rs @@ -120,10 +120,7 @@ pub(crate) enum ServerCommand { /// } /// ``` #[must_use = "futures do nothing unless you `.await` or poll them"] -pub enum Server { - Server(ServerInner), - Error(Option), -} +pub struct Server(Result>); impl Server { /// Create server build. @@ -131,60 +128,17 @@ impl Server { ServerBuilder::default() } - pub(crate) fn new(mut builder: ServerBuilder) -> Self { - let sockets = mem::take(&mut builder.sockets) - .into_iter() - .map(|t| (t.0, t.2)) - .collect(); - - // Give log information on what runtime will be used. - let is_actix = actix_rt::System::try_current().is_some(); - let is_tokio = tokio::runtime::Handle::try_current().is_ok(); - - match (is_actix, is_tokio) { - (true, _) => info!("Actix runtime found; starting in Actix runtime"), - (_, true) => info!("Tokio runtime found; starting in existing Tokio runtime"), - (_, false) => panic!("Actix or Tokio runtime not found; halting"), - } - - for (_, name, lst) in &builder.sockets { - info!( - r#"Starting service: "{}", workers: {}, listening on: {}"#, - name, - builder.threads, - lst.local_addr() - ); - } - - match Accept::start(sockets, &builder) { - Ok((waker_queue, worker_handles)) => { - // construct OS signals listener future - let signals = (builder.listen_os_signals).then(Signals::new); - - Self::Server(ServerInner { - cmd_tx: builder.cmd_tx.clone(), - cmd_rx: builder.cmd_rx, - signals, - waker_queue, - worker_handles, - worker_config: builder.worker_config, - services: builder.factories, - exit: builder.exit, - stop_task: None, - }) - } - - Err(err) => Self::Error(Some(err)), - } + pub(crate) fn new(builder: ServerBuilder) -> Self { + Server(ServerInner::new(builder).map_err(Some)) } /// Get a handle for ServerFuture that can be used to change state of actix server. /// /// See [ServerHandle](ServerHandle) for usage. pub fn handle(&self) -> ServerHandle { - match self { - Server::Server(inner) => ServerHandle::new(inner.cmd_tx.clone()), - Server::Error(err) => { + match &self.0 { + Ok(inner) => ServerHandle::new(inner.cmd_tx.clone()), + Err(err) => { // TODO: i don't think this is the best way to handle server startup fail panic!( "server handle can not be obtained because server failed to start up: {}", @@ -199,12 +153,12 @@ impl Future for Server { type Output = io::Result<()>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - match self.as_mut().get_mut() { - Self::Error(err) => Poll::Ready(Err(err + match &mut self.as_mut().get_mut().0 { + Err(err) => Poll::Ready(Err(err .take() .expect("Server future cannot be polled after error"))), - Self::Server(inner) => { + Ok(inner) => { // poll Signals if let Some(ref mut signals) = inner.signals { if let Poll::Ready(signal) = Pin::new(signals).poll(cx) { @@ -247,6 +201,49 @@ pub struct ServerInner { } impl ServerInner { + fn new(mut builder: ServerBuilder) -> io::Result { + let sockets = mem::take(&mut builder.sockets) + .into_iter() + .map(|t| (t.0, t.2)) + .collect(); + + // Give log information on what runtime will be used. + let is_actix = actix_rt::System::try_current().is_some(); + let is_tokio = tokio::runtime::Handle::try_current().is_ok(); + + match (is_actix, is_tokio) { + (true, _) => info!("Actix runtime found; starting in Actix runtime"), + (_, true) => info!("Tokio runtime found; starting in existing Tokio runtime"), + (_, false) => panic!("Actix or Tokio runtime not found; halting"), + } + + for (_, name, lst) in &builder.sockets { + info!( + r#"Starting service: "{}", workers: {}, listening on: {}"#, + name, + builder.threads, + lst.local_addr() + ); + } + + let (waker_queue, worker_handles) = Accept::start(sockets, &builder)?; + + // construct OS signals listener future + let signals = (builder.listen_os_signals).then(Signals::new); + + Ok(ServerInner { + cmd_tx: builder.cmd_tx.clone(), + cmd_rx: builder.cmd_rx, + signals, + waker_queue, + worker_handles, + worker_config: builder.worker_config, + services: builder.factories, + exit: builder.exit, + stop_task: None, + }) + } + fn handle_cmd(&mut self, item: ServerCommand) -> Option> { match item { ServerCommand::Pause(tx) => { From 9229c590d24c1bf5c5b9843b4f69fbb028962be4 Mon Sep 17 00:00:00 2001 From: Ali MJ Al-Nasrawy Date: Fri, 3 Dec 2021 23:44:31 +0300 Subject: [PATCH 2/3] Server: run after await --- actix-server/CHANGES.md | 3 + actix-server/Cargo.toml | 4 +- actix-server/src/server.rs | 154 +++++++++++++----------------- actix-server/tests/test_server.rs | 37 +++++-- 4 files changed, 99 insertions(+), 99 deletions(-) diff --git a/actix-server/CHANGES.md b/actix-server/CHANGES.md index 51da40f2be..11c503ac48 100644 --- a/actix-server/CHANGES.md +++ b/actix-server/CHANGES.md @@ -1,6 +1,9 @@ # Changes ## Unreleased - 2021-xx-xx +* `Server` now runs only after awaiting it. [#425] + +[#425]: https://github.com/actix/actix-net/pull/425 ## 2.0.0-beta.9 - 2021-11-15 diff --git a/actix-server/Cargo.toml b/actix-server/Cargo.toml index 9631958995..f79a8fd926 100755 --- a/actix-server/Cargo.toml +++ b/actix-server/Cargo.toml @@ -31,7 +31,7 @@ log = "0.4" mio = { version = "0.8", features = ["os-poll", "net"] } num_cpus = "1.13" socket2 = "0.4.2" -tokio = { version = "1.5.1", features = ["sync"] } +tokio = { version = "1.5.1", features = ["sync", "macros"] } # runtime for io-uring feature tokio-uring = { version = "0.1", optional = true } @@ -42,5 +42,5 @@ actix-rt = "2.4.0" bytes = "1" env_logger = "0.9" -futures-util = { version = "0.3.7", default-features = false, features = ["sink"] } +futures-util = { version = "0.3.7", default-features = false, features = ["sink", "async-await-macro"] } tokio = { version = "1.5.1", features = ["io-util", "rt-multi-thread", "macros"] } diff --git a/actix-server/src/server.rs b/actix-server/src/server.rs index bc0c956108..af1be4ce6a 100644 --- a/actix-server/src/server.rs +++ b/actix-server/src/server.rs @@ -9,10 +9,7 @@ use std::{ use actix_rt::{time::sleep, System}; use futures_core::future::BoxFuture; use log::{error, info}; -use tokio::sync::{ - mpsc::{UnboundedReceiver, UnboundedSender}, - oneshot, -}; +use tokio::sync::{mpsc::UnboundedReceiver, oneshot}; use crate::{ accept::Accept, @@ -120,7 +117,10 @@ pub(crate) enum ServerCommand { /// } /// ``` #[must_use = "futures do nothing unless you `.await` or poll them"] -pub struct Server(Result>); +pub struct Server { + handle: ServerHandle, + fut: BoxFuture<'static, io::Result<()>>, +} impl Server { /// Create server build. @@ -129,62 +129,26 @@ impl Server { } pub(crate) fn new(builder: ServerBuilder) -> Self { - Server(ServerInner::new(builder).map_err(Some)) + Server { + handle: ServerHandle::new(builder.cmd_tx.clone()), + fut: Box::pin(ServerInner::run(builder)), + } } /// Get a handle for ServerFuture that can be used to change state of actix server. /// /// See [ServerHandle](ServerHandle) for usage. pub fn handle(&self) -> ServerHandle { - match &self.0 { - Ok(inner) => ServerHandle::new(inner.cmd_tx.clone()), - Err(err) => { - // TODO: i don't think this is the best way to handle server startup fail - panic!( - "server handle can not be obtained because server failed to start up: {}", - err.as_ref().unwrap() - ); - } - } + self.handle.clone() } } impl Future for Server { type Output = io::Result<()>; - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - match &mut self.as_mut().get_mut().0 { - Err(err) => Poll::Ready(Err(err - .take() - .expect("Server future cannot be polled after error"))), - - Ok(inner) => { - // poll Signals - if let Some(ref mut signals) = inner.signals { - if let Poll::Ready(signal) = Pin::new(signals).poll(cx) { - inner.stop_task = inner.handle_signal(signal); - // drop signals listener - inner.signals = None; - } - } - - // handle stop tasks and eager drain command channel - loop { - if let Some(ref mut fut) = inner.stop_task { - // only resolve stop task and exit - return fut.as_mut().poll(cx).map(|_| Ok(())); - } - - match Pin::new(&mut inner.cmd_rx).poll_recv(cx) { - Poll::Ready(Some(cmd)) => { - // if stop task is required, set it and loop - inner.stop_task = inner.handle_cmd(cmd); - } - _ => return Poll::Pending, - } - } - } - } + #[inline] + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + Pin::new(&mut Pin::into_inner(self).fut).poll(cx) } } @@ -193,15 +157,35 @@ pub struct ServerInner { worker_config: ServerWorkerConfig, services: Vec>, exit: bool, - cmd_tx: UnboundedSender, - cmd_rx: UnboundedReceiver, - signals: Option, waker_queue: WakerQueue, - stop_task: Option>, + stopped: bool, } impl ServerInner { - fn new(mut builder: ServerBuilder) -> io::Result { + async fn run(builder: ServerBuilder) -> io::Result<()> { + let (mut this, mut cmd_rx, mut signal_fut) = Self::run_sync(builder)?; + let listen_to_signals = signal_fut.is_some(); + + while !this.stopped { + tokio::select! { + signal = async { + signal_fut.as_mut().unwrap().await + }, if listen_to_signals => { + this.handle_signal(signal).await; + }, + Some(cmd) = cmd_rx.recv() => { + this.handle_cmd(cmd).await; + }, + else => break, + }; + } + + Ok(()) + } + + fn run_sync( + mut builder: ServerBuilder, + ) -> io::Result<(Self, UnboundedReceiver, Option)> { let sockets = mem::take(&mut builder.sockets) .into_iter() .map(|t| (t.0, t.2)) @@ -229,66 +213,59 @@ impl ServerInner { let (waker_queue, worker_handles) = Accept::start(sockets, &builder)?; // construct OS signals listener future - let signals = (builder.listen_os_signals).then(Signals::new); + let signal_fut = (builder.listen_os_signals).then(Signals::new); - Ok(ServerInner { - cmd_tx: builder.cmd_tx.clone(), - cmd_rx: builder.cmd_rx, - signals, + let server = ServerInner { waker_queue, worker_handles, worker_config: builder.worker_config, services: builder.factories, exit: builder.exit, - stop_task: None, - }) + stopped: false, + }; + + Ok((server, builder.cmd_rx, signal_fut)) } - fn handle_cmd(&mut self, item: ServerCommand) -> Option> { + async fn handle_cmd(&mut self, item: ServerCommand) { match item { ServerCommand::Pause(tx) => { self.waker_queue.wake(WakerInterest::Pause); let _ = tx.send(()); - None } ServerCommand::Resume(tx) => { self.waker_queue.wake(WakerInterest::Resume); let _ = tx.send(()); - None } ServerCommand::Stop { graceful, completion, } => { - let exit = self.exit; + self.stopped = true; // stop accept thread self.waker_queue.wake(WakerInterest::Stop); - // stop workers - let workers_stop = self - .worker_handles - .iter() - .map(|worker| worker.stop(graceful)) - .collect::>(); - - Some(Box::pin(async move { - if graceful { - // wait for all workers to shut down - let _ = join_all(workers_stop).await; - } + if graceful { + // wait for all workers to shut down + let workers_stop = self + .worker_handles + .iter() + .map(|worker| worker.stop(graceful)) + .collect::>(); + let _ = join_all(workers_stop).await; + } - if let Some(tx) = completion { - let _ = tx.send(()); - } + if let Some(tx) = completion { + let _ = tx.send(()); + } - if exit { - sleep(Duration::from_millis(300)).await; - System::try_current().as_ref().map(System::stop); - } - })) + if self.exit { + sleep(Duration::from_millis(300)).await; + System::try_current().as_ref().map(System::stop); + } } ServerCommand::WorkerFaulted(idx) => { @@ -321,13 +298,11 @@ impl ServerInner { Err(err) => error!("can not restart worker {}: {}", idx, err), }; - - None } } } - fn handle_signal(&mut self, signal: SignalKind) -> Option> { + async fn handle_signal(&mut self, signal: SignalKind) { match signal { SignalKind::Int => { info!("SIGINT received; starting forced shutdown"); @@ -336,6 +311,7 @@ impl ServerInner { graceful: false, completion: None, }) + .await } SignalKind::Term => { @@ -345,6 +321,7 @@ impl ServerInner { graceful: true, completion: None, }) + .await } SignalKind::Quit => { @@ -354,6 +331,7 @@ impl ServerInner { graceful: false, completion: None, }) + .await } } } diff --git a/actix-server/tests/test_server.rs b/actix-server/tests/test_server.rs index 07eb247880..e175e32530 100644 --- a/actix-server/tests/test_server.rs +++ b/actix-server/tests/test_server.rs @@ -487,27 +487,46 @@ async fn worker_restart() { } #[test] -#[should_panic] -fn no_runtime() { - // test set up in a way that would prevent time out if support for runtime-less init was added +fn no_runtime_on_init() { + use std::{thread::sleep, time::Duration}; let addr = unused_addr(); + let counter = Arc::new(AtomicUsize::new(0)); - let srv = Server::build() - .workers(1) + let mut srv = Server::build() + .workers(2) .disable_signals() - .bind("test", addr, move || { - fn_service(|_| async { Ok::<_, ()>(()) }) + .bind("test", addr, { + let counter = counter.clone(); + move || { + counter.fetch_add(1, Ordering::SeqCst); + fn_service(|_| async { Ok::<_, ()>(()) }) + } }) .unwrap() .run(); + fn is_send(_: &T) {} + is_send(&srv); + is_send(&srv.handle()); + + sleep(Duration::from_millis(1_000)); + assert_eq!(counter.load(Ordering::SeqCst), 0); + let rt = tokio::runtime::Builder::new_current_thread() .enable_all() .build() .unwrap(); - let _ = srv.handle().stop(true); + rt.block_on(async move { + let _ = futures_util::poll!(&mut srv); + + // available after the first poll + sleep(Duration::from_millis(500)); + assert_eq!(counter.load(Ordering::SeqCst), 2); - rt.block_on(async { srv.await }).unwrap(); + let _ = srv.handle().stop(true); + srv.await + }) + .unwrap(); } From ede686d129226bddf75be2d04f5367aafd5f3423 Mon Sep 17 00:00:00 2001 From: Ali MJ Al-Nasrawy Date: Sat, 4 Dec 2021 00:12:02 +0300 Subject: [PATCH 3/3] docs --- actix-server/src/server.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/actix-server/src/server.rs b/actix-server/src/server.rs index af1be4ce6a..62151b1579 100644 --- a/actix-server/src/server.rs +++ b/actix-server/src/server.rs @@ -57,8 +57,8 @@ pub(crate) enum ServerCommand { /// Creates a worker per CPU core (or the number specified in [`ServerBuilder::workers`]) and /// distributes connections with a round-robin strategy. /// -/// The [Server] must be awaited to process stop commands and listen for OS signals. It will resolve -/// when the server has fully shut down. +/// The [Server] must be awaited in order to run. +/// It will resolve when the server has fully shut down. /// /// # Shutdown Signals /// On UNIX systems, `SIGQUIT` will start a graceful shutdown and `SIGTERM` or `SIGINT` will start a