From f299c28cb0620d58aa9f2e3f469127c6e0c37594 Mon Sep 17 00:00:00 2001 From: Olof Blomqvist Date: Sun, 13 Aug 2023 20:11:10 +0200 Subject: [PATCH 1/5] use named pipes on windows --- pallas-network/Cargo.toml | 4 ++++ pallas-network/src/facades.rs | 37 +++++++++++++++++++++++++++++ pallas-network/src/multiplexer.rs | 39 +++++++++++++++++++++++++++---- 3 files changed, 75 insertions(+), 5 deletions(-) diff --git a/pallas-network/Cargo.toml b/pallas-network/Cargo.toml index 55b26852..71fada46 100644 --- a/pallas-network/Cargo.toml +++ b/pallas-network/Cargo.toml @@ -23,6 +23,10 @@ thiserror = "1.0.31" tokio = { version = "1", features = ["net", "io-util", "time", "sync"] } tracing = "0.1.37" +[target.'cfg(windows)'.dependencies] +tokio-named-pipes = "0.1.0" +windows-sys = "0.48.0" + [dev-dependencies] tracing-subscriber = "0.3.16" tokio = { version = "1", features = ["full"] } diff --git a/pallas-network/src/facades.rs b/pallas-network/src/facades.rs index 31a57c79..c4c81381 100644 --- a/pallas-network/src/facades.rs +++ b/pallas-network/src/facades.rs @@ -126,6 +126,43 @@ impl NodeClient { }) } + #[cfg(target_os = "windows")] + pub async fn connect(pipe_name: impl AsRef, magic: u64) -> Result { + debug!("connecting"); + + let bearer = Bearer::connect_pipe(pipe_name) + .await + .map_err(Error::ConnectFailure)?; + + let mut plexer = multiplexer::Plexer::new(bearer); + + let hs_channel = plexer.subscribe_client(PROTOCOL_N2C_HANDSHAKE); + let cs_channel = plexer.subscribe_client(PROTOCOL_N2C_CHAIN_SYNC); + let sq_channel = plexer.subscribe_client(PROTOCOL_N2C_STATE_QUERY); + + let plexer_handle = tokio::spawn(async move { plexer.run().await }); + + let versions = handshake::n2c::VersionTable::only_v10(magic); + let mut client = handshake::Client::new(hs_channel); + + let handshake = client + .handshake(versions) + .await + .map_err(Error::HandshakeProtocol)?; + + if let handshake::Confirmation::Rejected(reason) = handshake { + error!(?reason, "handshake refused"); + return Err(Error::IncompatibleVersion); + } + + Ok(Self { + plexer_handle, + handshake, + chainsync: chainsync::Client::new(cs_channel), + statequery: localstate::Client::new(sq_channel), + }) + } + #[cfg(not(target_os = "windows"))] pub async fn handshake_query( path: impl AsRef, diff --git a/pallas-network/src/multiplexer.rs b/pallas-network/src/multiplexer.rs index 12622855..f425f780 100644 --- a/pallas-network/src/multiplexer.rs +++ b/pallas-network/src/multiplexer.rs @@ -2,10 +2,14 @@ use byteorder::{ByteOrder, NetworkEndian}; use pallas_codec::{minicbor, Fragment}; + +use tokio::io::AsyncWriteExt; + +#[cfg(windows)] +use tokio::net::windows::named_pipe::NamedPipeClient; + use std::net::SocketAddr; -use std::path::Path; use thiserror::Error; -use tokio::io::AsyncWriteExt; use tokio::net::{TcpListener, TcpStream, ToSocketAddrs}; use tokio::select; use tokio::sync::mpsc::error::SendError; @@ -63,6 +67,7 @@ pub struct Segment { #[cfg(target_os = "windows")] pub enum Bearer { Tcp(TcpStream), + Socket(NamedPipeClient) } #[cfg(not(target_os = "windows"))] @@ -84,25 +89,45 @@ impl Bearer { Ok((Self::Tcp(stream), addr)) } + #[cfg(windows)] + pub async fn connect_pipe(pipe_name: impl AsRef) -> Result{ + let client = loop { + match tokio::net::windows::named_pipe::ClientOptions::new().open(&pipe_name) { + Ok(client) => break client, + Err(e) if e.raw_os_error() == Some(windows_sys::Win32::Foundation::ERROR_PIPE_BUSY as i32) => (), + Err(e) => return Err(e), + } + + tokio::time::sleep(std::time::Duration::from_millis(50)).await; + }; + Ok(Self::Socket(client)) + } + #[cfg(not(target_os = "windows"))] pub async fn connect_unix(path: impl AsRef) -> Result { let stream = UnixStream::connect(path).await?; Ok(Self::Unix(stream)) } - pub async fn readable(&self) -> tokio::io::Result<()> { + pub async fn readable(&mut self) -> tokio::io::Result<()> { match self { Bearer::Tcp(x) => x.readable().await, #[cfg(not(target_os = "windows"))] Bearer::Unix(x) => x.readable().await, + + #[cfg(target_os = "windows")] + Bearer::Socket(x) => x.readable().await + } } - fn try_read(&mut self, buf: &mut [u8]) -> tokio::io::Result { + async fn try_read(&mut self, buf: &mut [u8]) -> tokio::io::Result { match self { Bearer::Tcp(x) => x.try_read(buf), #[cfg(not(target_os = "windows"))] Bearer::Unix(x) => x.try_read(buf), + #[cfg(target_os = "windows")] + Bearer::Socket(x) => x.try_read(buf) } } @@ -111,6 +136,8 @@ impl Bearer { Bearer::Tcp(x) => x.write_all(buf).await, #[cfg(not(target_os = "windows"))] Bearer::Unix(x) => x.write_all(buf).await, + #[cfg(target_os = "windows")] + Bearer::Socket(x) => x.write_all(buf).await, } } @@ -119,6 +146,8 @@ impl Bearer { Bearer::Tcp(x) => x.flush().await, #[cfg(not(target_os = "windows"))] Bearer::Unix(x) => x.flush().await, + #[cfg(target_os = "windows")] + Bearer::Socket(x) => x.flush().await, } } } @@ -166,7 +195,7 @@ impl SegmentBuffer { let remaining = required - self.1.len(); let mut buf = vec![0u8; remaining]; - match self.0.try_read(&mut buf) { + match self.0.try_read(&mut buf).await { Ok(0) => { error!("empty bearer"); break Err(Error::EmptyBearer); From 69376aed991a5aabf5da4a057df70c1532a92ced Mon Sep 17 00:00:00 2001 From: Olof Blomqvist Date: Thu, 24 Aug 2023 10:10:46 +0200 Subject: [PATCH 2/5] rename from socket to named_pipe --- pallas-network/src/facades.rs | 2 +- pallas-network/src/multiplexer.rs | 14 +++++++------- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/pallas-network/src/facades.rs b/pallas-network/src/facades.rs index c4c81381..55594830 100644 --- a/pallas-network/src/facades.rs +++ b/pallas-network/src/facades.rs @@ -130,7 +130,7 @@ impl NodeClient { pub async fn connect(pipe_name: impl AsRef, magic: u64) -> Result { debug!("connecting"); - let bearer = Bearer::connect_pipe(pipe_name) + let bearer = Bearer::connect_named_pipe(pipe_name) .await .map_err(Error::ConnectFailure)?; diff --git a/pallas-network/src/multiplexer.rs b/pallas-network/src/multiplexer.rs index f425f780..0ebdbe93 100644 --- a/pallas-network/src/multiplexer.rs +++ b/pallas-network/src/multiplexer.rs @@ -67,7 +67,7 @@ pub struct Segment { #[cfg(target_os = "windows")] pub enum Bearer { Tcp(TcpStream), - Socket(NamedPipeClient) + NamedPipe(NamedPipeClient) } #[cfg(not(target_os = "windows"))] @@ -90,7 +90,7 @@ impl Bearer { } #[cfg(windows)] - pub async fn connect_pipe(pipe_name: impl AsRef) -> Result{ + pub async fn connect_named_pipe(pipe_name: impl AsRef) -> Result{ let client = loop { match tokio::net::windows::named_pipe::ClientOptions::new().open(&pipe_name) { Ok(client) => break client, @@ -100,7 +100,7 @@ impl Bearer { tokio::time::sleep(std::time::Duration::from_millis(50)).await; }; - Ok(Self::Socket(client)) + Ok(Self::NamedPipe(client)) } #[cfg(not(target_os = "windows"))] @@ -116,7 +116,7 @@ impl Bearer { Bearer::Unix(x) => x.readable().await, #[cfg(target_os = "windows")] - Bearer::Socket(x) => x.readable().await + Bearer::NamedPipe(x) => x.readable().await } } @@ -127,7 +127,7 @@ impl Bearer { #[cfg(not(target_os = "windows"))] Bearer::Unix(x) => x.try_read(buf), #[cfg(target_os = "windows")] - Bearer::Socket(x) => x.try_read(buf) + Bearer::NamedPipe(x) => x.try_read(buf) } } @@ -137,7 +137,7 @@ impl Bearer { #[cfg(not(target_os = "windows"))] Bearer::Unix(x) => x.write_all(buf).await, #[cfg(target_os = "windows")] - Bearer::Socket(x) => x.write_all(buf).await, + Bearer::NamedPipe(x) => x.write_all(buf).await, } } @@ -147,7 +147,7 @@ impl Bearer { #[cfg(not(target_os = "windows"))] Bearer::Unix(x) => x.flush().await, #[cfg(target_os = "windows")] - Bearer::Socket(x) => x.flush().await, + Bearer::NamedPipe(x) => x.flush().await, } } } From 38fe0efb421133e768688b56d57d85069006ee72 Mon Sep 17 00:00:00 2001 From: Olof Blomqvist Date: Sun, 27 Aug 2023 20:55:52 +0200 Subject: [PATCH 3/5] fix linux build --- pallas-network/src/multiplexer.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pallas-network/src/multiplexer.rs b/pallas-network/src/multiplexer.rs index 0ebdbe93..2d12a363 100644 --- a/pallas-network/src/multiplexer.rs +++ b/pallas-network/src/multiplexer.rs @@ -104,7 +104,7 @@ impl Bearer { } #[cfg(not(target_os = "windows"))] - pub async fn connect_unix(path: impl AsRef) -> Result { + pub async fn connect_unix(path: impl AsRef) -> Result { let stream = UnixStream::connect(path).await?; Ok(Self::Unix(stream)) } From ca797bd745e659ee0acfe7d77c4631224d73abb7 Mon Sep 17 00:00:00 2001 From: Olof Blomqvist Date: Sat, 7 Oct 2023 14:08:00 +0200 Subject: [PATCH 4/5] share common code between connect methods --- pallas-network/src/facades.rs | 58 +++++++++++++++-------------------- 1 file changed, 25 insertions(+), 33 deletions(-) diff --git a/pallas-network/src/facades.rs b/pallas-network/src/facades.rs index db2b64b8..70debf89 100644 --- a/pallas-network/src/facades.rs +++ b/pallas-network/src/facades.rs @@ -8,7 +8,13 @@ use tracing::{debug, error}; #[cfg(not(target_os = "windows"))] use tokio::net::UnixListener; -use crate::miniprotocols::handshake::{n2c, n2n, Confirmation, VersionNumber}; +use crate::miniprotocols::handshake::{ + n2c::VersionData, + n2n, VersionTable, + Confirmation, + VersionNumber +}; + use crate::miniprotocols::PROTOCOL_N2N_HANDSHAKE; use crate::{ miniprotocols::{ @@ -154,13 +160,8 @@ pub struct NodeClient { } impl NodeClient { - #[cfg(not(target_os = "windows"))] - pub async fn connect(path: impl AsRef, magic: u64) -> Result { - debug!("connecting"); - - let bearer = Bearer::connect_unix(path) - .await - .map_err(Error::ConnectFailure)?; + + async fn connect_bearer(bearer:Bearer,versions: VersionTable) -> Result { let mut plexer = multiplexer::Plexer::new(bearer); @@ -170,7 +171,6 @@ impl NodeClient { let plexer_handle = tokio::spawn(async move { plexer.run().await }); - let versions = handshake::n2c::VersionTable::v10_and_above(magic); let mut client = handshake::Client::new(hs_channel); let handshake = client @@ -191,41 +191,33 @@ impl NodeClient { }) } - #[cfg(target_os = "windows")] - pub async fn connect(pipe_name: impl AsRef, magic: u64) -> Result { + + #[cfg(not(target_os = "windows"))] + pub async fn connect(path: impl AsRef, magic: u64) -> Result { debug!("connecting"); - let bearer = Bearer::connect_named_pipe(pipe_name) + let bearer = Bearer::connect_unix(path) .await .map_err(Error::ConnectFailure)?; - let mut plexer = multiplexer::Plexer::new(bearer); - - let hs_channel = plexer.subscribe_client(PROTOCOL_N2C_HANDSHAKE); - let cs_channel = plexer.subscribe_client(PROTOCOL_N2C_CHAIN_SYNC); - let sq_channel = plexer.subscribe_client(PROTOCOL_N2C_STATE_QUERY); + let versions = handshake::n2c::VersionTable::v10_and_above(magic); - let plexer_handle = tokio::spawn(async move { plexer.run().await }); + Self::connect_bearer(bearer,versions).await + } - let versions = handshake::n2c::VersionTable::only_v10(magic); - let mut client = handshake::Client::new(hs_channel); + #[cfg(target_os = "windows")] + pub async fn connect(pipe_name: impl AsRef, magic: u64) -> Result { + debug!("connecting"); - let handshake = client - .handshake(versions) + let bearer = Bearer::connect_named_pipe(pipe_name) .await - .map_err(Error::HandshakeProtocol)?; + .map_err(Error::ConnectFailure)?; + + let versions = + handshake::n2c::VersionTable::only_v10(magic); - if let handshake::Confirmation::Rejected(reason) = handshake { - error!(?reason, "handshake refused"); - return Err(Error::IncompatibleVersion); - } + Self::connect_bearer(bearer,versions).await - Ok(Self { - plexer_handle, - handshake, - chainsync: chainsync::Client::new(cs_channel), - statequery: localstate::Client::new(sq_channel), - }) } #[cfg(not(target_os = "windows"))] From efdd0b2ea5f56bd00ac6b09bdeb9a4a94b0982c3 Mon Sep 17 00:00:00 2001 From: Olof Blomqvist Date: Sat, 7 Oct 2023 14:09:02 +0200 Subject: [PATCH 5/5] revert change of method sig to async --- pallas-network/src/multiplexer.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pallas-network/src/multiplexer.rs b/pallas-network/src/multiplexer.rs index 26a8e695..57d23d3a 100644 --- a/pallas-network/src/multiplexer.rs +++ b/pallas-network/src/multiplexer.rs @@ -130,7 +130,7 @@ impl Bearer { } } - async fn try_read(&mut self, buf: &mut [u8]) -> tokio::io::Result { + fn try_read(&mut self, buf: &mut [u8]) -> tokio::io::Result { match self { Bearer::Tcp(x) => x.try_read(buf), #[cfg(not(target_os = "windows"))] @@ -203,8 +203,8 @@ impl SegmentBuffer { let remaining = required - self.1.len(); let mut buf = vec![0u8; remaining]; - - match self.0.try_read(&mut buf).await { + + match self.0.try_read(&mut buf) { Ok(0) => { error!("empty bearer"); break Err(Error::EmptyBearer);