From ded8281275ea97fee64b1d320afa0dd922161656 Mon Sep 17 00:00:00 2001 From: Igor Matuszewski Date: Fri, 22 Jan 2021 14:33:53 +0100 Subject: [PATCH 1/7] Replace tokio::io::Result with std This is simply a re-exported from std, so no change should be observed. Done to reduce explicit API surface used by Tokio the dependency. --- src/unix.rs | 2 +- src/win.rs | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/unix.rs b/src/unix.rs index 666dd9a..1fd528a 100644 --- a/src/unix.rs +++ b/src/unix.rs @@ -68,7 +68,7 @@ pub struct Endpoint { impl Endpoint { /// Stream of incoming connections - pub fn incoming(self) -> io::Result> + 'static> { + pub fn incoming(self) -> io::Result> + 'static> { let listener = self.inner()?; // the call to bind in `inner()` creates the file // `apply_permission()` will set the file permissions. diff --git a/src/win.rs b/src/win.rs index bfd6a60..ec950dd 100644 --- a/src/win.rs +++ b/src/win.rs @@ -30,7 +30,7 @@ pub struct Endpoint { impl Endpoint { /// Stream of incoming connections - pub fn incoming(mut self) -> io::Result> + 'static> { + pub fn incoming(mut self) -> io::Result> + 'static> { let pipe = self.inner()?; Ok(Incoming { path: self.path.clone(), @@ -144,7 +144,7 @@ pub struct Incoming { } impl Stream for Incoming { - type Item = tokio::io::Result; + type Item = std::io::Result; fn poll_next(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll> { match self.inner.pipe.get_ref().connect() { From 03bc9374d4cdb56e0a97bd1c01b2023f6b789262 Mon Sep 17 00:00:00 2001 From: Igor Matuszewski Date: Fri, 22 Jan 2021 14:36:00 +0100 Subject: [PATCH 2/7] Don't glob import from Tokio prelude Again, done to reduce explicit API surface used from Tokio --- src/unix.rs | 2 +- src/win.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/unix.rs b/src/unix.rs index 1fd528a..57d072c 100644 --- a/src/unix.rs +++ b/src/unix.rs @@ -2,7 +2,7 @@ use libc::chmod; use std::ffi::CString; use std::io::{self, Error}; use futures::Stream; -use tokio::prelude::*; +use tokio::prelude::{AsyncRead, AsyncWrite}; use tokio::net::{UnixListener, UnixStream}; use std::path::Path; use std::pin::Pin; diff --git a/src/win.rs b/src/win.rs index ec950dd..7b2d4c9 100644 --- a/src/win.rs +++ b/src/win.rs @@ -11,7 +11,7 @@ use std::marker; use std::mem; use std::ptr; use futures::Stream; -use tokio::prelude::*; +use tokio::prelude::{AsyncRead, AsyncWrite}; use std::pin::Pin; use std::task::{Context, Poll}; use std::path::Path; From 86448fd829183f24d35dfe539e5565aa798ef03a Mon Sep 17 00:00:00 2001 From: Igor Matuszewski Date: Fri, 22 Jan 2021 14:52:54 +0100 Subject: [PATCH 3/7] unix: Migrate to Tokio 1.0 --- Cargo.toml | 5 ++++- src/unix.rs | 16 +++++++--------- 2 files changed, 11 insertions(+), 10 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 14844d4..e1134b7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,8 +17,11 @@ log = "0.4" mio-named-pipes = "0.1" miow = "0.3.3" rand = "0.7" -tokio = { version = "0.2", features = ["io-driver", "io-util", "uds", "stream", "rt-core", "macros", "time"] } +tokio = { version = "1.0.0", features = ["net"] } libc = "0.2.65" [target.'cfg(windows)'.dependencies] winapi = { version = "0.3", features = ["winbase", "winnt", "accctrl", "aclapi", "securitybaseapi", "minwinbase", "winbase"] } + +[dev-dependencies] +tokio = { version = "1.0.0", features = ["rt", "time", "macros"] } diff --git a/src/unix.rs b/src/unix.rs index 57d072c..1cdf00c 100644 --- a/src/unix.rs +++ b/src/unix.rs @@ -2,12 +2,11 @@ use libc::chmod; use std::ffi::CString; use std::io::{self, Error}; use futures::Stream; -use tokio::prelude::{AsyncRead, AsyncWrite}; +use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; use tokio::net::{UnixListener, UnixStream}; use std::path::Path; use std::pin::Pin; use std::task::{Context, Poll}; -use std::mem::MaybeUninit; /// Socket permissions and ownership on UNIX pub struct SecurityAttributes { @@ -124,7 +123,10 @@ impl Stream for Incoming { cx: &mut Context<'_>, ) -> Poll> { let this = Pin::into_inner(self); - Pin::new(&mut this.listener).poll_next(cx) + match Pin::new(&mut this.listener).poll_accept(cx) { + Poll::Pending => Poll::Pending, + Poll::Ready(result) => Poll::Ready(Some(result.map(|(stream, _addr)| stream))), + } } } @@ -149,15 +151,11 @@ impl Connection { } impl AsyncRead for Connection { - unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [MaybeUninit]) -> bool { - self.inner.prepare_uninitialized_buffer(buf) - } - fn poll_read( self: Pin<&mut Self>, ctx: &mut Context<'_>, - buf: &mut [u8], - ) -> Poll> { + buf: &mut ReadBuf<'_>, + ) -> Poll> { let this = Pin::into_inner(self); Pin::new(&mut this.inner).poll_read(ctx, buf) } From c3595c629c10943c7765218817acda14e7ca0ce6 Mon Sep 17 00:00:00 2001 From: Igor Matuszewski Date: Fri, 22 Jan 2021 15:01:13 +0100 Subject: [PATCH 4/7] Adapt tests and examples --- Cargo.toml | 2 +- examples/client.rs | 6 +++--- examples/server.rs | 8 ++------ src/lib.rs | 12 ++++-------- src/win.rs | 2 +- 5 files changed, 11 insertions(+), 19 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index e1134b7..925993b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -24,4 +24,4 @@ libc = "0.2.65" winapi = { version = "0.3", features = ["winbase", "winnt", "accctrl", "aclapi", "securitybaseapi", "minwinbase", "winbase"] } [dev-dependencies] -tokio = { version = "1.0.0", features = ["rt", "time", "macros"] } +tokio = { version = "1.0.0", features = ["io-util", "rt", "time", "macros"] } diff --git a/examples/client.rs b/examples/client.rs index 418461d..5a06e9f 100644 --- a/examples/client.rs +++ b/examples/client.rs @@ -1,7 +1,7 @@ -use tokio::{self, prelude::*}; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; use parity_tokio_ipc::Endpoint; -#[tokio::main] +#[tokio::main(flavor = "current_thread")] async fn main() { let path = std::env::args().nth(1).expect("Run it with server path to connect as argument"); @@ -19,6 +19,6 @@ async fn main() { break; } - tokio::time::delay_for(std::time::Duration::from_secs(2)).await; + tokio::time::sleep(std::time::Duration::from_secs(2)).await; } } diff --git a/examples/server.rs b/examples/server.rs index 9c53ab3..5e04f48 100644 --- a/examples/server.rs +++ b/examples/server.rs @@ -1,9 +1,5 @@ use futures::StreamExt as _; -use tokio::{ - prelude::*, - self, - io::split, -}; +use tokio::io::{split, AsyncReadExt, AsyncWriteExt}; use parity_tokio_ipc::{Endpoint, SecurityAttributes}; @@ -40,7 +36,7 @@ async fn run_server(path: String) { }; } -#[tokio::main] +#[tokio::main(flavor = "current_thread")] async fn main() { let path = std::env::args().nth(1).expect("Run it with server path as argument"); run_server(path).await diff --git a/src/lib.rs b/src/lib.rs index 4d2cbd7..339f119 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -47,13 +47,9 @@ pub fn dummy_endpoint() -> String { #[cfg(test)] mod tests { - use tokio::prelude::*; use futures::{channel::oneshot, StreamExt as _, FutureExt as _}; use std::time::Duration; - use tokio::{ - self, - io::split, - }; + use tokio::io::{split, AsyncReadExt, AsyncWriteExt}; use super::{dummy_endpoint, Endpoint, SecurityAttributes}; use std::path::Path; @@ -100,12 +96,12 @@ mod tests { }); tokio::spawn(server); - tokio::time::delay_for(Duration::from_secs(2)).await; + tokio::time::sleep(Duration::from_secs(2)).await; println!("Connecting to client 0..."); let mut client_0 = Endpoint::connect(&path).await .expect("failed to open client_0"); - tokio::time::delay_for(Duration::from_secs(2)).await; + tokio::time::sleep(Duration::from_secs(2)).await; println!("Connecting to client 1..."); let mut client_1 = Endpoint::connect(&path).await .expect("failed to open client_1"); @@ -125,7 +121,7 @@ mod tests { // shutdown server if let Ok(()) = shutdown_tx.send(()) { // wait one second for the file to be deleted. - tokio::time::delay_for(Duration::from_secs(1)).await; + tokio::time::sleep(Duration::from_secs(1)).await; let path = Path::new(&path); // assert that it has assert!(!path.exists()); diff --git a/src/win.rs b/src/win.rs index 7b2d4c9..a4f5bfd 100644 --- a/src/win.rs +++ b/src/win.rs @@ -11,7 +11,7 @@ use std::marker; use std::mem; use std::ptr; use futures::Stream; -use tokio::prelude::{AsyncRead, AsyncWrite}; +use tokio::io::{AsyncRead, AsyncWrite}; use std::pin::Pin; use std::task::{Context, Poll}; use std::path::Path; From d7ade0ecdcafab1cd75d7ec0718f707dc5a0418c Mon Sep 17 00:00:00 2001 From: Igor Matuszewski Date: Tue, 29 Jun 2021 22:25:43 +0200 Subject: [PATCH 5/7] win: Rewrite using new named_pipe module introduced in Tokio 1.7 --- Cargo.toml | 6 +- examples/server.rs | 3 +- src/lib.rs | 3 +- src/win.rs | 223 +++++++++++++++++++-------------------------- 4 files changed, 100 insertions(+), 135 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 925993b..b280070 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,14 +14,12 @@ Interprocess communication library for tokio. [dependencies] futures = "0.3" log = "0.4" -mio-named-pipes = "0.1" -miow = "0.3.3" rand = "0.7" -tokio = { version = "1.0.0", features = ["net"] } +tokio = { version = "1.7.0", features = ["net", "time"] } libc = "0.2.65" [target.'cfg(windows)'.dependencies] winapi = { version = "0.3", features = ["winbase", "winnt", "accctrl", "aclapi", "securitybaseapi", "minwinbase", "winbase"] } [dev-dependencies] -tokio = { version = "1.0.0", features = ["io-util", "rt", "time", "macros"] } +tokio = { version = "1.7.0", features = ["io-util", "rt", "time", "macros"] } diff --git a/examples/server.rs b/examples/server.rs index 5e04f48..bad6b6c 100644 --- a/examples/server.rs +++ b/examples/server.rs @@ -7,7 +7,8 @@ async fn run_server(path: String) { let mut endpoint = Endpoint::new(path); endpoint.set_security_attributes(SecurityAttributes::allow_everyone_create().unwrap()); - let mut incoming = endpoint.incoming().expect("failed to open new socket"); + let incoming = endpoint.incoming().expect("failed to open new socket"); + futures::pin_mut!(incoming); while let Some(result) = incoming.next().await { diff --git a/src/lib.rs b/src/lib.rs index 339f119..b842c6a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -64,7 +64,8 @@ mod tests { .set_mode(0o777) .unwrap() ); - let mut incoming = endpoint.incoming().expect("failed to open up a new socket"); + let incoming = endpoint.incoming().expect("failed to open up a new socket"); + futures::pin_mut!(incoming); while let Some(result) = incoming.next().await { match result { diff --git a/src/win.rs b/src/win.rs index a4f5bfd..61eaefb 100644 --- a/src/win.rs +++ b/src/win.rs @@ -6,61 +6,69 @@ use winapi::um::securitybaseapi::*; use winapi::um::winbase::{LocalAlloc, LocalFree}; use winapi::um::winnt::*; +use futures::Stream; use std::io; use std::marker; use std::mem; -use std::ptr; -use futures::Stream; -use tokio::io::{AsyncRead, AsyncWrite}; +use std::path::Path; use std::pin::Pin; +use std::ptr; use std::task::{Context, Poll}; -use std::path::Path; -use std::mem::MaybeUninit; -use tokio::io::PollEvented; +use std::time::{Duration, Instant}; +use tokio::io::{AsyncRead, AsyncWrite}; + +use tokio::net::windows::named_pipe; -type NamedPipe = PollEvented; +enum NamedPipe { + Server(named_pipe::NamedPipeServer), + Client(named_pipe::NamedPipeClient), +} -const PIPE_AVAILABILITY_TIMEOUT: u64 = 5000; +const PIPE_AVAILABILITY_TIMEOUT: Duration = Duration::from_secs(5); /// Endpoint implementation for windows pub struct Endpoint { path: String, security_attributes: SecurityAttributes, + created_listener: bool, } impl Endpoint { /// Stream of incoming connections - pub fn incoming(mut self) -> io::Result> + 'static> { - let pipe = self.inner()?; - Ok(Incoming { - path: self.path.clone(), - inner: NamedPipeSupport { - path: self.path, - pipe, - security_attributes: self.security_attributes, - }, - }) + pub fn incoming( + mut self, + ) -> io::Result> + 'static> { + let pipe = self.create_listener()?; + + let stream = + futures::stream::try_unfold((pipe, self), |(listener, mut endpoint)| async move { + let () = listener.connect().await?; + + let new_listener = endpoint.create_listener()?; + + let conn = Connection::wrap(NamedPipe::Server(listener)); + + Ok(Some((conn, (new_listener, endpoint)))) + }); + + Ok(stream) } - /// Inner platform-dependant state of the endpoint - fn inner(&mut self) -> io::Result { - use miow::pipe::NamedPipeBuilder; - use std::os::windows::io::*; - - let raw_handle = unsafe { - NamedPipeBuilder::new(&self.path) - .first(true) - .inbound(true) - .accept_remote(false) - .outbound(true) - .out_buffer_size(65536) - .in_buffer_size(65536) - .with_security_attributes(self.security_attributes.as_ptr())? - .into_raw_handle() - }; + fn create_listener(&mut self) -> io::Result { + let server = unsafe { + named_pipe::ServerOptions::new() + .first_pipe_instance(!self.created_listener) + .reject_remote_clients(true) + .access_inbound(true) + .access_outbound(true) + .create_with_security_attributes_raw( + &self.path, + self.security_attributes.as_ptr() as *mut libc::c_void, + ) + }?; + self.created_listener = true; - let mio_pipe = unsafe { mio_named_pipes::NamedPipe::from_raw_handle(raw_handle) }; - NamedPipe::new(mio_pipe) + Ok(server) } /// Set security attributes for the connection @@ -75,29 +83,33 @@ impl Endpoint { /// Make new connection using the provided path and running event pool. pub async fn connect>(path: P) -> io::Result { - Ok(Connection::wrap(Self::connect_inner(path.as_ref())?)) - } + let path = path.as_ref(); + + let mut duration = Duration::new(0, 0); + let client = loop { + match named_pipe::ClientOptions::new() + .read(true) + .write(true) + .open(path) + { + Ok(client) => break client, + Err(e) + if e.raw_os_error() + == Some(winapi::shared::winerror::ERROR_PIPE_BUSY as i32) => + { + if duration >= PIPE_AVAILABILITY_TIMEOUT { + return Err(e); + } + } + Err(e) => return Err(e), + } - fn connect_inner(path: &Path) -> io::Result { - use std::fs::OpenOptions; - use std::os::windows::fs::OpenOptionsExt; - use std::os::windows::io::{FromRawHandle, IntoRawHandle}; - use winapi::um::winbase::FILE_FLAG_OVERLAPPED; + let now = Instant::now(); + tokio::time::sleep(Duration::from_millis(50)).await; + duration += now.elapsed(); + }; - // Wait for the pipe to become available or fail after 5 seconds. - miow::pipe::NamedPipe::wait( - path, - Some(std::time::Duration::from_millis(PIPE_AVAILABILITY_TIMEOUT)), - )?; - let file = OpenOptions::new() - .read(true) - .write(true) - .custom_flags(FILE_FLAG_OVERLAPPED) - .open(path)?; - let mio_pipe = - unsafe { mio_named_pipes::NamedPipe::from_raw_handle(file.into_raw_handle()) }; - let pipe = NamedPipe::new(mio_pipe)?; - Ok(pipe) + Ok(Connection::wrap(NamedPipe::Client(client))) } /// New IPC endpoint at the given path @@ -105,64 +117,7 @@ impl Endpoint { Endpoint { path, security_attributes: SecurityAttributes::empty(), - } - } -} - -struct NamedPipeSupport { - path: String, - pipe: NamedPipe, - security_attributes: SecurityAttributes, -} - -impl NamedPipeSupport { - fn replacement_pipe(&mut self) -> io::Result { - use miow::pipe::NamedPipeBuilder; - use std::os::windows::io::*; - - let raw_handle = unsafe { - NamedPipeBuilder::new(&self.path) - .first(false) - .inbound(true) - .outbound(true) - .out_buffer_size(65536) - .in_buffer_size(65536) - .with_security_attributes(self.security_attributes.as_ptr())? - .into_raw_handle() - }; - - let mio_pipe = unsafe { mio_named_pipes::NamedPipe::from_raw_handle(raw_handle) }; - NamedPipe::new(mio_pipe) - } -} - -/// Stream of incoming connections -pub struct Incoming { - #[allow(dead_code)] - path: String, - inner: NamedPipeSupport, -} - -impl Stream for Incoming { - type Item = std::io::Result; - - fn poll_next(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll> { - match self.inner.pipe.get_ref().connect() { - Ok(()) => { - log::trace!("Incoming connection polled successfully"); - let new_listener = self.inner.replacement_pipe()?; - Poll::Ready( - Some(Ok(Connection::wrap(std::mem::replace(&mut self.inner.pipe, new_listener)))) - ) - } - Err(e) => { - if e.kind() == io::ErrorKind::WouldBlock { - self.inner.pipe.clear_write_ready(ctx)?; - Poll::Pending - } else { - Poll::Ready(Some(Err(e))) - } - } + created_listener: false, } } } @@ -174,23 +129,22 @@ pub struct Connection { impl Connection { /// Wraps an existing named pipe - pub fn wrap(pipe: NamedPipe) -> Self { + fn wrap(pipe: NamedPipe) -> Self { Self { inner: pipe } } } impl AsyncRead for Connection { - unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [MaybeUninit]) -> bool { - self.inner.prepare_uninitialized_buffer(buf) - } - fn poll_read( self: Pin<&mut Self>, ctx: &mut Context<'_>, - buf: &mut [u8], - ) -> Poll> { + buf: &mut tokio::io::ReadBuf<'_>, + ) -> Poll> { let this = Pin::into_inner(self); - Pin::new(&mut this.inner).poll_read(ctx, buf) + match this.inner { + NamedPipe::Client(ref mut c) => Pin::new(c).poll_read(ctx, buf), + NamedPipe::Server(ref mut s) => Pin::new(s).poll_read(ctx, buf), + } } } @@ -201,17 +155,26 @@ impl AsyncWrite for Connection { buf: &[u8], ) -> Poll> { let this = Pin::into_inner(self); - Pin::new(&mut this.inner).poll_write(ctx, buf) + match this.inner { + NamedPipe::Client(ref mut c) => Pin::new(c).poll_write(ctx, buf), + NamedPipe::Server(ref mut s) => Pin::new(s).poll_write(ctx, buf), + } } fn poll_flush(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll> { let this = Pin::into_inner(self); - Pin::new(&mut this.inner).poll_flush(ctx) + match this.inner { + NamedPipe::Client(ref mut c) => Pin::new(c).poll_flush(ctx), + NamedPipe::Server(ref mut s) => Pin::new(s).poll_flush(ctx), + } } fn poll_shutdown(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll> { let this = Pin::into_inner(self); - Pin::new(&mut this.inner).poll_shutdown(ctx) + match this.inner { + NamedPipe::Client(ref mut c) => Pin::new(c).poll_shutdown(ctx), + NamedPipe::Server(ref mut s) => Pin::new(s).poll_shutdown(ctx), + } } } @@ -225,13 +188,15 @@ pub const DEFAULT_SECURITY_ATTRIBUTES: SecurityAttributes = SecurityAttributes { descriptor: SecurityDescriptor { descriptor_ptr: ptr::null_mut(), }, - acl: Acl { acl_ptr: ptr::null_mut() }, + acl: Acl { + acl_ptr: ptr::null_mut(), + }, attrs: SECURITY_ATTRIBUTES { nLength: mem::size_of::() as u32, lpSecurityDescriptor: ptr::null_mut(), bInheritHandle: 0, }, - }) + }), }; impl SecurityAttributes { @@ -281,6 +246,7 @@ impl Sid { fn everyone_sid() -> io::Result { let mut sid_ptr = ptr::null_mut(); let result = unsafe { + #[allow(const_item_mutation)] AllocateAndInitializeSid( SECURITY_WORLD_SID_AUTHORITY.as_mut_ptr() as *mut _, 1, @@ -500,5 +466,4 @@ mod test { .allow_everyone_connect() .expect("failed to create security attributes that allow everyone to read and write to/from a pipe"); } - } From 5a930fc245adbb9c8874cfd6510170293a6a6ce7 Mon Sep 17 00:00:00 2001 From: Igor Matuszewski Date: Wed, 30 Jun 2021 13:25:59 +0200 Subject: [PATCH 6/7] win: Explicitly specify named pipe buffer size --- src/win.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/win.rs b/src/win.rs index 61eaefb..0e136b2 100644 --- a/src/win.rs +++ b/src/win.rs @@ -61,6 +61,8 @@ impl Endpoint { .reject_remote_clients(true) .access_inbound(true) .access_outbound(true) + .in_buffer_size(65536) + .out_buffer_size(65536) .create_with_security_attributes_raw( &self.path, self.security_attributes.as_ptr() as *mut libc::c_void, From b86fd388febfd0c55d5d678b21fa2bb9c2ca05b6 Mon Sep 17 00:00:00 2001 From: Igor Matuszewski Date: Wed, 30 Jun 2021 13:35:32 +0200 Subject: [PATCH 7/7] win: Clean up connection time out handling --- src/win.rs | 20 +++++++++----------- 1 file changed, 9 insertions(+), 11 deletions(-) diff --git a/src/win.rs b/src/win.rs index 0e136b2..ac987b4 100644 --- a/src/win.rs +++ b/src/win.rs @@ -1,4 +1,4 @@ -use winapi::shared::winerror::ERROR_SUCCESS; +use winapi::shared::winerror::{ERROR_PIPE_BUSY, ERROR_SUCCESS}; use winapi::um::accctrl::*; use winapi::um::aclapi::*; use winapi::um::minwinbase::{LPTR, PSECURITY_ATTRIBUTES, SECURITY_ATTRIBUTES}; @@ -87,7 +87,9 @@ impl Endpoint { pub async fn connect>(path: P) -> io::Result { let path = path.as_ref(); - let mut duration = Duration::new(0, 0); + // There is not async equivalent of waiting for a named pipe in Windows, + // so we keep trying or sleeping for a bit, until we hit a timeout + let attempt_start = Instant::now(); let client = loop { match named_pipe::ClientOptions::new() .read(true) @@ -95,20 +97,16 @@ impl Endpoint { .open(path) { Ok(client) => break client, - Err(e) - if e.raw_os_error() - == Some(winapi::shared::winerror::ERROR_PIPE_BUSY as i32) => - { - if duration >= PIPE_AVAILABILITY_TIMEOUT { + Err(e) if e.raw_os_error() == Some(ERROR_PIPE_BUSY as i32) => { + if attempt_start.elapsed() < PIPE_AVAILABILITY_TIMEOUT { + tokio::time::sleep(Duration::from_millis(50)).await; + continue; + } else { return Err(e); } } Err(e) => return Err(e), } - - let now = Instant::now(); - tokio::time::sleep(Duration::from_millis(50)).await; - duration += now.elapsed(); }; Ok(Connection::wrap(NamedPipe::Client(client)))