From 86c329ee26f40b6cf9e86aeaf4813b65f96ae2f0 Mon Sep 17 00:00:00 2001 From: Jiahao XU Date: Mon, 13 Nov 2023 22:08:15 +1000 Subject: [PATCH] Optimize `tokio::child::windows::ChildImp::wait` (#26) --- src/tokio/child/windows.rs | 39 ++++++++++++++++++++++++++------------ src/winres.rs | 7 +++++++ 2 files changed, 34 insertions(+), 12 deletions(-) diff --git a/src/tokio/child/windows.rs b/src/tokio/child/windows.rs index 4af01cc..6ad6d36 100644 --- a/src/tokio/child/windows.rs +++ b/src/tokio/child/windows.rs @@ -1,4 +1,4 @@ -use std::{io::Result, mem, process::ExitStatus}; +use std::{io::Result, mem, ops::ControlFlow, process::ExitStatus}; use tokio::{ process::{Child, ChildStderr, ChildStdin, ChildStdout}, task::spawn_blocking, @@ -67,7 +67,7 @@ impl ChildImp { self.inner.id() } - fn wait_imp(handles: JobPort, timeout: DWORD) -> Result<()> { + fn wait_imp(completion_port: ThreadSafeRawHandle, timeout: DWORD) -> Result> { let mut code: DWORD = 0; let mut key: ULONG_PTR = 0; let mut overlapped = mem::MaybeUninit::::uninit(); @@ -75,7 +75,7 @@ impl ChildImp { let result = unsafe { GetQueuedCompletionStatus( - handles.completion_port, + completion_port.0, &mut code, &mut key, &mut lp_overlapped, @@ -86,25 +86,40 @@ impl ChildImp { // ignore timing out errors unless the timeout was specified to INFINITE // https://docs.microsoft.com/en-us/windows/win32/api/ioapiset/nf-ioapiset-getqueuedcompletionstatus if timeout != INFINITE && result == FALSE && lp_overlapped.is_null() { - return Ok(()); + return Ok(ControlFlow::Continue(())); } res_bool(result)?; - // don't drop them - mem::forget(handles); - - Ok(()) + Ok(ControlFlow::Break(())) } pub async fn wait(&mut self) -> Result { - let handles = self.handles.clone(); - spawn_blocking(|| Self::wait_imp(handles, INFINITE)).await??; - self.inner.wait().await + const MAX_RETRY_ATTEMPT: usize = 10; + + // Always wait for parent to exit first. + // + // It's likely that all its children has already exited and reaped by + // the time the parent exits. + let status = self.inner.wait().await?; + + let completion_port = ThreadSafeRawHandle(self.handles.completion_port); + + // Try waiting for group exit, if it is still alive after several + // attempts, then spawn a blocking task to reap them. + for retry_attempt in 1..=MAX_RETRY_ATTEMPT { + if Self::wait_imp(completion_port, 0)?.is_break() { + break; + } else if retry_attempt == MAX_RETRY_ATTEMPT { + spawn_blocking(move || Self::wait_imp(completion_port, INFINITE)).await??; + } + } + + Ok(status) } pub fn try_wait(&mut self) -> Result> { - Self::wait_imp(self.handles.clone(), 0)?; + Self::wait_imp(ThreadSafeRawHandle(self.handles.completion_port), 0)?; self.inner.try_wait() } } diff --git a/src/winres.rs b/src/winres.rs index 1dae734..75bcfc0 100644 --- a/src/winres.rs +++ b/src/winres.rs @@ -39,6 +39,13 @@ impl Drop for JobPort { unsafe impl Send for JobPort {} unsafe impl Sync for JobPort {} +#[derive(Copy, Clone)] +#[repr(transparent)] +pub(crate) struct ThreadSafeRawHandle(pub HANDLE); + +unsafe impl Send for ThreadSafeRawHandle {} +unsafe impl Sync for ThreadSafeRawHandle {} + pub(crate) fn res_null(handle: HANDLE) -> Result { if handle.is_null() { Err(Error::last_os_error())