Skip to content

Commit

Permalink
net: add read/try_read etc methods to NamedPipeServer (#3899)
Browse files Browse the repository at this point in the history
  • Loading branch information
Jake-Shadle authored Jun 29, 2021
1 parent d35ff70 commit 57c90c9
Show file tree
Hide file tree
Showing 3 changed files with 611 additions and 15 deletions.
72 changes: 63 additions & 9 deletions examples/named-pipe-ready.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::io;

#[cfg(windows)]
async fn windows_main() -> io::Result<()> {
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader, Interest};
use tokio::io::Interest;
use tokio::net::windows::named_pipe::{ClientOptions, ServerOptions};

const PIPE_NAME: &str = r"\\.\pipe\named-pipe-single-client";
Expand All @@ -13,11 +13,62 @@ async fn windows_main() -> io::Result<()> {
// Note: we wait for a client to connect.
server.connect().await?;

let mut server = BufReader::new(server);
let buf = {
let mut read_buf = [0u8; 5];
let mut read_buf_cursor = 0;

loop {
server.readable().await?;

let buf = &mut read_buf[read_buf_cursor..];

match server.try_read(buf) {
Ok(n) => {
read_buf_cursor += n;

if read_buf_cursor == read_buf.len() {
break;
}
}
Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
continue;
}
Err(e) => {
return Err(e);
}
}
}

read_buf
};

{
let write_buf = b"pong\n";
let mut write_buf_cursor = 0;

loop {
let buf = &write_buf[write_buf_cursor..];

if buf.is_empty() {
break;
}

server.writable().await?;

match server.try_write(buf) {
Ok(n) => {
write_buf_cursor += n;
}
Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
continue;
}
Err(e) => {
return Err(e);
}
}
}
}

let mut buf = String::new();
server.read_line(&mut buf).await?;
server.write_all(b"pong\n").await?;
Ok::<_, io::Error>(buf)
});

Expand All @@ -33,9 +84,12 @@ async fn windows_main() -> io::Result<()> {
let mut write_buf_cursor = 0;

loop {
let ready = client
.ready(Interest::READABLE | Interest::WRITABLE)
.await?;
let mut interest = Interest::READABLE;
if write_buf_cursor < write_buf.len() {
interest |= Interest::WRITABLE;
}

let ready = client.ready(interest).await?;

if ready.is_readable() {
let buf = &mut read_buf[read_buf_cursor..];
Expand Down Expand Up @@ -85,7 +139,7 @@ async fn windows_main() -> io::Result<()> {

let (server, client) = tokio::try_join!(server, client)?;

assert_eq!(server?, "ping\n");
assert_eq!(server?, *b"ping\n");
assert_eq!(client?, "pong\n");

Ok(())
Expand Down
Loading

0 comments on commit 57c90c9

Please sign in to comment.