From 7804ed12ebe2b04861ecba792dc60299fc6ded6d Mon Sep 17 00:00:00 2001 From: fakeshadow <24548779@qq.com> Date: Wed, 2 Mar 2022 11:52:12 +0800 Subject: [PATCH] block and wait for accept thread to exit. (#443) Co-authored-by: Rob Ede --- actix-server/CHANGES.md | 3 +++ actix-server/src/accept.rs | 6 +++--- actix-server/src/server.rs | 15 +++++++++++++-- 3 files changed, 19 insertions(+), 5 deletions(-) diff --git a/actix-server/CHANGES.md b/actix-server/CHANGES.md index 14e5f4d7be..daec441357 100644 --- a/actix-server/CHANGES.md +++ b/actix-server/CHANGES.md @@ -1,6 +1,9 @@ # Changes ## Unreleased - 2021-xx-xx +- Wait for accept thread to stop before sending completion signal. [#443] + +[#443]: https://github.com/actix/actix-net/pull/443 ## 2.0.0 - 2022-01-19 diff --git a/actix-server/src/accept.rs b/actix-server/src/accept.rs index 9f7872f873..a1c4f73228 100644 --- a/actix-server/src/accept.rs +++ b/actix-server/src/accept.rs @@ -41,7 +41,7 @@ impl Accept { pub(crate) fn start( sockets: Vec<(usize, MioListener)>, builder: &ServerBuilder, - ) -> io::Result<(WakerQueue, Vec)> { + ) -> io::Result<(WakerQueue, Vec, thread::JoinHandle<()>)> { let handle_server = ServerHandle::new(builder.cmd_tx.clone()); // construct poll instance and its waker @@ -73,12 +73,12 @@ impl Accept { handle_server, )?; - thread::Builder::new() + let accept_handle = thread::Builder::new() .name("actix-server acceptor".to_owned()) .spawn(move || accept.poll_with(&mut sockets)) .map_err(|err| io::Error::new(io::ErrorKind::Other, err))?; - Ok((waker_queue, handles_server)) + Ok((waker_queue, handles_server, accept_handle)) } fn new_with_sockets( diff --git a/actix-server/src/server.rs b/actix-server/src/server.rs index fec3b06ee4..8defa54394 100644 --- a/actix-server/src/server.rs +++ b/actix-server/src/server.rs @@ -3,6 +3,7 @@ use std::{ io, mem, pin::Pin, task::{Context, Poll}, + thread, time::Duration, }; @@ -158,6 +159,7 @@ impl Future for Server { pub struct ServerInner { worker_handles: Vec, + accept_handle: Option>, worker_config: ServerWorkerConfig, services: Vec>, waker_queue: WakerQueue, @@ -205,7 +207,7 @@ impl ServerInner { ); } - let (waker_queue, worker_handles) = Accept::start(sockets, &builder)?; + let (waker_queue, worker_handles, accept_handle) = Accept::start(sockets, &builder)?; let mux = ServerEventMultiplexer { signal_fut: (builder.listen_os_signals).then(Signals::new), @@ -214,6 +216,7 @@ impl ServerInner { let server = ServerInner { waker_queue, + accept_handle: Some(accept_handle), worker_handles, worker_config: builder.worker_config, services: builder.factories, @@ -243,7 +246,8 @@ impl ServerInner { } => { self.stopping = true; - // stop accept thread + // Signal accept thread to stop. + // Signal is non-blocking; we wait for thread to stop later. self.waker_queue.wake(WakerInterest::Stop); // send stop signal to workers @@ -258,6 +262,13 @@ impl ServerInner { let _ = join_all(workers_stop).await; } + // wait for accept thread stop + self.accept_handle + .take() + .unwrap() + .join() + .expect("Accept thread must not panic in any case"); + if let Some(tx) = completion { let _ = tx.send(()); }