@@ -173,6 +173,15 @@ fn random_number() -> usize {
173173 }
174174}
175175
176+ // Abstracts over `ReadFileEx` and `WriteFileEx`
177+ type AlertableIoFn = unsafe extern "system" fn (
178+ BorrowedHandle < ' _ > ,
179+ c:: LPVOID ,
180+ c:: DWORD ,
181+ c:: LPOVERLAPPED ,
182+ c:: LPOVERLAPPED_COMPLETION_ROUTINE ,
183+ ) -> c:: BOOL ;
184+
176185impl AnonPipe {
177186 pub fn handle ( & self ) -> & Handle {
178187 & self . inner
@@ -182,7 +191,19 @@ impl AnonPipe {
182191 }
183192
184193 pub fn read ( & self , buf : & mut [ u8 ] ) -> io:: Result < usize > {
185- self . inner . read ( buf)
194+ let result = unsafe {
195+ let len = crate :: cmp:: min ( buf. len ( ) , c:: DWORD :: MAX as usize ) as c:: DWORD ;
196+ self . alertable_io_internal ( c:: ReadFileEx , buf. as_mut_ptr ( ) as _ , len)
197+ } ;
198+
199+ match result {
200+ // The special treatment of BrokenPipe is to deal with Windows
201+ // pipe semantics, which yields this error when *reading* from
202+ // a pipe after the other end has closed; we interpret that as
203+ // EOF on the pipe.
204+ Err ( ref e) if e. kind ( ) == io:: ErrorKind :: BrokenPipe => Ok ( 0 ) ,
205+ _ => result,
206+ }
186207 }
187208
188209 pub fn read_vectored ( & self , bufs : & mut [ IoSliceMut < ' _ > ] ) -> io:: Result < usize > {
@@ -195,7 +216,10 @@ impl AnonPipe {
195216 }
196217
197218 pub fn write ( & self , buf : & [ u8 ] ) -> io:: Result < usize > {
198- self . inner . write ( buf)
219+ unsafe {
220+ let len = crate :: cmp:: min ( buf. len ( ) , c:: DWORD :: MAX as usize ) as c:: DWORD ;
221+ self . alertable_io_internal ( c:: WriteFileEx , buf. as_ptr ( ) as _ , len)
222+ }
199223 }
200224
201225 pub fn write_vectored ( & self , bufs : & [ IoSlice < ' _ > ] ) -> io:: Result < usize > {
@@ -206,6 +230,99 @@ impl AnonPipe {
206230 pub fn is_write_vectored ( & self ) -> bool {
207231 self . inner . is_write_vectored ( )
208232 }
233+
234+ /// Synchronizes asynchronous reads or writes using our anonymous pipe.
235+ ///
236+ /// This is a wrapper around [`ReadFileEx`] or [`WriteFileEx`] that uses
237+ /// [Asynchronous Procedure Call] (APC) to synchronize reads or writes.
238+ ///
239+ /// Note: This should not be used for handles we don't create.
240+ ///
241+ /// # Safety
242+ ///
243+ /// `buf` must be a pointer to a buffer that's valid for reads or writes
244+ /// up to `len` bytes. The `AlertableIoFn` must be either `ReadFileEx` or `WriteFileEx`
245+ ///
246+ /// [`ReadFileEx`]: https://docs.microsoft.com/en-us/windows/win32/api/fileapi/nf-fileapi-readfileex
247+ /// [`WriteFileEx`]: https://docs.microsoft.com/en-us/windows/win32/api/fileapi/nf-fileapi-writefileex
248+ /// [Asynchronous Procedure Call]: https://docs.microsoft.com/en-us/windows/win32/sync/asynchronous-procedure-calls
249+ unsafe fn alertable_io_internal (
250+ & self ,
251+ io : AlertableIoFn ,
252+ buf : c:: LPVOID ,
253+ len : c:: DWORD ,
254+ ) -> io:: Result < usize > {
255+ // Use "alertable I/O" to synchronize the pipe I/O.
256+ // This has four steps.
257+ //
258+ // STEP 1: Start the asynchronous I/O operation.
259+ // This simply calls either `ReadFileEx` or `WriteFileEx`,
260+ // giving it a pointer to the buffer and callback function.
261+ //
262+ // STEP 2: Enter an alertable state.
263+ // The callback set in step 1 will not be called until the thread
264+ // enters an "alertable" state. This can be done using `SleepEx`.
265+ //
266+ // STEP 3: The callback
267+ // Once the I/O is complete and the thread is in an alertable state,
268+ // the callback will be run on the same thread as the call to
269+ // `ReadFileEx` or `WriteFileEx` done in step 1.
270+ // In the callback we simply set the result of the async operation.
271+ //
272+ // STEP 4: Return the result.
273+ // At this point we'll have a result from the callback function
274+ // and can simply return it. Note that we must not return earlier,
275+ // while the I/O is still in progress.
276+
277+ // The result that will be set from the asynchronous callback.
278+ let mut async_result: Option < AsyncResult > = None ;
279+ struct AsyncResult {
280+ error : u32 ,
281+ transfered : u32 ,
282+ }
283+
284+ // STEP 3: The callback.
285+ unsafe extern "system" fn callback (
286+ dwErrorCode : u32 ,
287+ dwNumberOfBytesTransfered : u32 ,
288+ lpOverlapped : * mut c:: OVERLAPPED ,
289+ ) {
290+ // Set `async_result` using a pointer smuggled through `hEvent`.
291+ let result = AsyncResult { error : dwErrorCode, transfered : dwNumberOfBytesTransfered } ;
292+ * ( * lpOverlapped) . hEvent . cast :: < Option < AsyncResult > > ( ) = Some ( result) ;
293+ }
294+
295+ // STEP 1: Start the I/O operation.
296+ let mut overlapped: c:: OVERLAPPED = crate :: mem:: zeroed ( ) ;
297+ // `hEvent` is unused by `ReadFileEx` and `WriteFileEx`.
298+ // Therefore the documentation suggests using it to smuggle a pointer to the callback.
299+ overlapped. hEvent = & mut async_result as * mut _ as * mut _ ;
300+
301+ // Asynchronous read of the pipe.
302+ // If successful, `callback` will be called once it completes.
303+ let result = io ( self . inner . as_handle ( ) , buf, len, & mut overlapped, callback) ;
304+ if result == c:: FALSE {
305+ // We can return here because the call failed.
306+ // After this we must not return until the I/O completes.
307+ return Err ( io:: Error :: last_os_error ( ) ) ;
308+ }
309+
310+ // Wait indefinitely for the result.
311+ let result = loop {
312+ // STEP 2: Enter an alertable state.
313+ // The second parameter of `SleepEx` is used to make this sleep alertable.
314+ c:: SleepEx ( c:: INFINITE , c:: TRUE ) ;
315+ if let Some ( result) = async_result {
316+ break result;
317+ }
318+ } ;
319+ // STEP 4: Return the result.
320+ // `async_result` is always `Some` at this point
321+ match result. error {
322+ c:: ERROR_SUCCESS => Ok ( result. transfered as usize ) ,
323+ error => Err ( io:: Error :: from_raw_os_error ( error as _ ) ) ,
324+ }
325+ }
209326}
210327
211328pub fn read2 ( p1 : AnonPipe , v1 : & mut Vec < u8 > , p2 : AnonPipe , v2 : & mut Vec < u8 > ) -> io:: Result < ( ) > {
0 commit comments