Skip to content

Commit

Permalink
feat(network): implement windows named pipes connections (#279)
Browse files Browse the repository at this point in the history
  • Loading branch information
OlofBlomqvist authored Nov 16, 2023
1 parent 6963dd9 commit d6bcffe
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 14 deletions.
4 changes: 4 additions & 0 deletions pallas-network/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ thiserror = "1.0.31"
tokio = { version = "1", features = ["net", "io-util", "time", "sync", "macros"] }
tracing = "0.1.37"

[target.'cfg(windows)'.dependencies]
tokio-named-pipes = "0.1.0"
windows-sys = "0.48.0"

[dev-dependencies]
tracing-subscriber = "0.3.16"
tokio = { version = "1", features = ["full"] }
Expand Down
50 changes: 40 additions & 10 deletions pallas-network/src/facades.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,13 @@ use tracing::{debug, error};
#[cfg(not(target_os = "windows"))]
use tokio::net::UnixListener;

use crate::miniprotocols::handshake::{n2c, n2n, Confirmation, VersionNumber};
use crate::miniprotocols::handshake::{
n2c::VersionData,
n2n, VersionTable,
Confirmation,

Check warning on line 14 in pallas-network/src/facades.rs

View workflow job for this annotation

GitHub Actions / Check (windows-latest, stable)

unused import: `Confirmation`
VersionNumber
};

use crate::miniprotocols::PROTOCOL_N2N_HANDSHAKE;
use crate::{
miniprotocols::{
Expand Down Expand Up @@ -154,13 +160,8 @@ pub struct NodeClient {
}

impl NodeClient {
#[cfg(not(target_os = "windows"))]
pub async fn connect(path: impl AsRef<Path>, magic: u64) -> Result<Self, Error> {
debug!("connecting");

let bearer = Bearer::connect_unix(path)
.await
.map_err(Error::ConnectFailure)?;

async fn connect_bearer(bearer:Bearer,versions: VersionTable<VersionData>) -> Result<Self, Error> {

let mut plexer = multiplexer::Plexer::new(bearer);

Expand All @@ -170,7 +171,6 @@ impl NodeClient {

let plexer_handle = tokio::spawn(async move { plexer.run().await });

let versions = handshake::n2c::VersionTable::v10_and_above(magic);
let mut client = handshake::Client::new(hs_channel);

let handshake = client
Expand All @@ -191,6 +191,35 @@ impl NodeClient {
})
}


#[cfg(not(target_os = "windows"))]
pub async fn connect(path: impl AsRef<Path>, magic: u64) -> Result<Self, Error> {
debug!("connecting");

let bearer = Bearer::connect_unix(path)
.await
.map_err(Error::ConnectFailure)?;

let versions = handshake::n2c::VersionTable::v10_and_above(magic);

Self::connect_bearer(bearer,versions).await
}

#[cfg(target_os = "windows")]
pub async fn connect(pipe_name: impl AsRef<std::ffi::OsStr>, magic: u64) -> Result<Self, Error> {
debug!("connecting");

let bearer = Bearer::connect_named_pipe(pipe_name)
.await
.map_err(Error::ConnectFailure)?;

let versions =
handshake::n2c::VersionTable::only_v10(magic);

Self::connect_bearer(bearer,versions).await

}

#[cfg(not(target_os = "windows"))]
pub async fn handshake_query(
path: impl AsRef<Path>,
Expand Down Expand Up @@ -245,7 +274,8 @@ impl NodeClient {
}
}

/// Server of N2C Ouroboros
/// Server of N2C Ouroboros.
#[cfg(not(target_os = "windows"))]
pub struct NodeServer {
pub plexer_handle: JoinHandle<Result<(), crate::multiplexer::Error>>,
pub version: (VersionNumber, n2c::VersionData),

Check failure on line 281 in pallas-network/src/facades.rs

View workflow job for this annotation

GitHub Actions / Check (ubuntu-latest, stable)

failed to resolve: use of undeclared crate or module `n2c`

Check failure on line 281 in pallas-network/src/facades.rs

View workflow job for this annotation

GitHub Actions / Test Suite

failed to resolve: use of undeclared crate or module `n2c`

Check failure on line 281 in pallas-network/src/facades.rs

View workflow job for this annotation

GitHub Actions / Check (macOS-latest, stable)

failed to resolve: use of undeclared crate or module `n2c`
Expand Down
38 changes: 34 additions & 4 deletions pallas-network/src/multiplexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
use byteorder::{ByteOrder, NetworkEndian};
use pallas_codec::{minicbor, Fragment};
use std::net::SocketAddr;
use std::path::Path;
use thiserror::Error;
use tokio::io::AsyncWriteExt;
use tokio::net::{TcpListener, TcpStream, ToSocketAddrs};
Expand All @@ -15,6 +14,12 @@ use tracing::{debug, error, trace};
#[cfg(not(target_os = "windows"))]
use tokio::net::{UnixListener, UnixStream};

#[cfg(not(target_os = "windows"))]
use tokio::net::UnixListener;

Check failure on line 18 in pallas-network/src/multiplexer.rs

View workflow job for this annotation

GitHub Actions / Check (ubuntu-latest, stable)

the name `UnixListener` is defined multiple times

Check warning on line 18 in pallas-network/src/multiplexer.rs

View workflow job for this annotation

GitHub Actions / Check (ubuntu-latest, stable)

unused import: `tokio::net::UnixListener`

Check failure on line 18 in pallas-network/src/multiplexer.rs

View workflow job for this annotation

GitHub Actions / Test Suite

the name `UnixListener` is defined multiple times

Check warning on line 18 in pallas-network/src/multiplexer.rs

View workflow job for this annotation

GitHub Actions / Test Suite

unused import: `tokio::net::UnixListener`

Check failure on line 18 in pallas-network/src/multiplexer.rs

View workflow job for this annotation

GitHub Actions / Check (macOS-latest, stable)

the name `UnixListener` is defined multiple times

Check warning on line 18 in pallas-network/src/multiplexer.rs

View workflow job for this annotation

GitHub Actions / Check (macOS-latest, stable)

unused import: `tokio::net::UnixListener`

#[cfg(windows)]
use tokio::net::windows::named_pipe::NamedPipeClient;

const HEADER_LEN: usize = 8;

pub type Timestamp = u32;
Expand Down Expand Up @@ -63,6 +68,7 @@ pub struct Segment {
#[cfg(target_os = "windows")]
pub enum Bearer {
Tcp(TcpStream),
NamedPipe(NamedPipeClient)
}

#[cfg(not(target_os = "windows"))]
Expand Down Expand Up @@ -92,17 +98,35 @@ impl Bearer {
Ok((Self::Unix(stream), addr))
}

#[cfg(windows)]
pub async fn connect_named_pipe(pipe_name: impl AsRef<std::ffi::OsStr>) -> Result<Self, tokio::io::Error>{
let client = loop {
match tokio::net::windows::named_pipe::ClientOptions::new().open(&pipe_name) {
Ok(client) => break client,
Err(e) if e.raw_os_error() == Some(windows_sys::Win32::Foundation::ERROR_PIPE_BUSY as i32) => (),
Err(e) => return Err(e),
}

tokio::time::sleep(std::time::Duration::from_millis(50)).await;
};
Ok(Self::NamedPipe(client))
}

#[cfg(not(target_os = "windows"))]
pub async fn connect_unix(path: impl AsRef<Path>) -> Result<Self, tokio::io::Error> {
pub async fn connect_unix(path: impl AsRef<std::path::Path>) -> Result<Self, tokio::io::Error> {
let stream = UnixStream::connect(path).await?;
Ok(Self::Unix(stream))
}

pub async fn readable(&self) -> tokio::io::Result<()> {
pub async fn readable(&mut self) -> tokio::io::Result<()> {
match self {
Bearer::Tcp(x) => x.readable().await,
#[cfg(not(target_os = "windows"))]
Bearer::Unix(x) => x.readable().await,

#[cfg(target_os = "windows")]
Bearer::NamedPipe(x) => x.readable().await

}
}

Expand All @@ -111,6 +135,8 @@ impl Bearer {
Bearer::Tcp(x) => x.try_read(buf),
#[cfg(not(target_os = "windows"))]
Bearer::Unix(x) => x.try_read(buf),
#[cfg(target_os = "windows")]
Bearer::NamedPipe(x) => x.try_read(buf)
}
}

Expand All @@ -119,6 +145,8 @@ impl Bearer {
Bearer::Tcp(x) => x.write_all(buf).await,
#[cfg(not(target_os = "windows"))]
Bearer::Unix(x) => x.write_all(buf).await,
#[cfg(target_os = "windows")]
Bearer::NamedPipe(x) => x.write_all(buf).await,
}
}

Expand All @@ -127,6 +155,8 @@ impl Bearer {
Bearer::Tcp(x) => x.flush().await,
#[cfg(not(target_os = "windows"))]
Bearer::Unix(x) => x.flush().await,
#[cfg(target_os = "windows")]
Bearer::NamedPipe(x) => x.flush().await,
}
}
}
Expand Down Expand Up @@ -173,7 +203,7 @@ impl SegmentBuffer {

let remaining = required - self.1.len();
let mut buf = vec![0u8; remaining];

match self.0.try_read(&mut buf) {
Ok(0) => {
error!("empty bearer");
Expand Down

0 comments on commit d6bcffe

Please sign in to comment.