Skip to content

Commit

Permalink
Windows support
Browse files Browse the repository at this point in the history
Co-authored-by: Fredrik Enestad <fredrik@enestad.com>
  • Loading branch information
eaufavor and fredr committed Oct 11, 2024
1 parent 2896879 commit 4aadba1
Show file tree
Hide file tree
Showing 23 changed files with 629 additions and 54 deletions.
2 changes: 1 addition & 1 deletion .bleep
Original file line number Diff line number Diff line change
@@ -1 +1 @@
aa7c01c0f1c7a33d4ec55cf3531997e4cb2542ab
ca6a894f27f4448b3d0fe7ab3d84221133e83e5b
36 changes: 32 additions & 4 deletions pingora-core/src/connectors/l4.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,15 @@ use log::debug;
use pingora_error::{Context, Error, ErrorType::*, OrErr, Result};
use rand::seq::SliceRandom;
use std::net::SocketAddr as InetSocketAddr;
#[cfg(unix)]
use std::os::unix::io::AsRawFd;
#[cfg(windows)]
use std::os::windows::io::AsRawSocket;

#[cfg(unix)]
use crate::protocols::l4::ext::connect_uds;
use crate::protocols::l4::ext::{
connect_uds, connect_with as tcp_connect, set_dscp, set_recv_buf, set_tcp_fastopen_connect,
connect_with as tcp_connect, set_dscp, set_recv_buf, set_tcp_fastopen_connect,
};
use crate::protocols::l4::socket::SocketAddr;
use crate::protocols::l4::stream::Stream;
Expand Down Expand Up @@ -102,16 +107,21 @@ where
match peer_addr {
SocketAddr::Inet(addr) => {
let connect_future = tcp_connect(addr, bind_to.as_ref(), |socket| {
#[cfg(unix)]
let raw = socket.as_raw_fd();
#[cfg(windows)]
let raw = socket.as_raw_socket();

if peer.tcp_fast_open() {
set_tcp_fastopen_connect(socket.as_raw_fd())?;
set_tcp_fastopen_connect(raw)?;
}
if let Some(recv_buf) = peer.tcp_recv_buf() {
debug!("Setting recv buf size");
set_recv_buf(socket.as_raw_fd(), recv_buf)?;
set_recv_buf(raw, recv_buf)?;
}
if let Some(dscp) = peer.dscp() {
debug!("Setting dscp");
set_dscp(socket.as_raw_fd(), dscp)?;
set_dscp(raw, dscp)?;
}
Ok(())
});
Expand All @@ -137,6 +147,7 @@ where
}
}
}
#[cfg(unix)]
SocketAddr::Unix(addr) => {
let connect_future = connect_uds(
addr.as_pathname()
Expand Down Expand Up @@ -179,7 +190,10 @@ where
}
stream.set_nodelay()?;

#[cfg(unix)]
let digest = SocketDigest::from_raw_fd(stream.as_raw_fd());
#[cfg(windows)]
let digest = SocketDigest::from_raw_socket(stream.as_raw_socket());
digest
.peer_addr
.set(Some(peer_addr.clone()))
Expand Down Expand Up @@ -217,6 +231,7 @@ pub(crate) fn bind_to_random<P: Peer>(
InetSocketAddr::V4(_) => bind_to_ips(v4_list),
InetSocketAddr::V6(_) => bind_to_ips(v6_list),
},
#[cfg(unix)]
SocketAddr::Unix(_) => None,
};

Expand All @@ -235,6 +250,7 @@ pub(crate) fn bind_to_random<P: Peer>(

use crate::protocols::raw_connect;

#[cfg(unix)]
async fn proxy_connect<P: Peer>(peer: &P) -> Result<Stream> {
// safe to unwrap
let proxy = peer.get_proxy().unwrap();
Expand Down Expand Up @@ -275,13 +291,19 @@ async fn proxy_connect<P: Peer>(peer: &P) -> Result<Stream> {
Ok(*stream)
}

#[cfg(windows)]
async fn proxy_connect<P: Peer>(peer: &P) -> Result<Stream> {
panic!("peer proxy not supported on windows")
}

#[cfg(test)]
mod tests {
use super::*;
use crate::upstreams::peer::{BasicPeer, HttpPeer, Proxy};
use std::collections::BTreeMap;
use std::path::PathBuf;
use tokio::io::AsyncWriteExt;
#[cfg(unix)]
use tokio::net::UnixListener;

#[tokio::test]
Expand Down Expand Up @@ -359,6 +381,7 @@ mod tests {
assert!(new_session.is_ok());
}

#[cfg(unix)]
#[tokio::test]
async fn test_connect_proxy_fail() {
let mut peer = HttpPeer::new("1.1.1.1:80".to_string(), false, "".to_string());
Expand All @@ -376,9 +399,11 @@ mod tests {
assert!(!e.retry());
}

#[cfg(unix)]
const MOCK_UDS_PATH: &str = "/tmp/test_unix_connect_proxy.sock";

// one-off mock server
#[cfg(unix)]
async fn mock_connect_server() {
let _ = std::fs::remove_file(MOCK_UDS_PATH);
let listener = UnixListener::bind(MOCK_UDS_PATH).unwrap();
Expand Down Expand Up @@ -410,10 +435,12 @@ mod tests {
assert!(new_session.is_ok());
}

#[cfg(unix)]
const MOCK_BAD_UDS_PATH: &str = "/tmp/test_unix_bad_connect_proxy.sock";

// one-off mock bad proxy
// closes connection upon accepting
#[cfg(unix)]
async fn mock_connect_bad_server() {
let _ = std::fs::remove_file(MOCK_BAD_UDS_PATH);
let listener = UnixListener::bind(MOCK_BAD_UDS_PATH).unwrap();
Expand All @@ -424,6 +451,7 @@ mod tests {
let _ = std::fs::remove_file(MOCK_BAD_UDS_PATH);
}

#[cfg(unix)]
#[tokio::test(flavor = "multi_thread")]
async fn test_connect_proxy_conn_closed() {
tokio::spawn(async {
Expand Down
21 changes: 21 additions & 0 deletions pingora-core/src/connectors/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,11 +195,29 @@ impl TransportConnector {
let mut stream = l.into_inner();
// test_reusable_stream: we assume server would never actively send data
// first on an idle stream.
#[cfg(unix)]
if peer.matches_fd(stream.id()) && test_reusable_stream(&mut stream) {
Some(stream)
} else {
None
}
#[cfg(windows)]
{
use std::os::windows::io::{AsRawSocket, RawSocket};
struct WrappedRawSocket(RawSocket);
impl AsRawSocket for WrappedRawSocket {
fn as_raw_socket(&self) -> RawSocket {
self.0
}
}
if peer.matches_sock(WrappedRawSocket(stream.id() as RawSocket))
&& test_reusable_stream(&mut stream)
{
Some(stream)
} else {
None
}
}
}
Err(_) => {
error!("failed to acquire reusable stream");
Expand Down Expand Up @@ -373,6 +391,7 @@ mod tests {
use crate::tls::ssl::SslMethod;
use crate::upstreams::peer::BasicPeer;
use tokio::io::AsyncWriteExt;
#[cfg(unix)]
use tokio::net::UnixListener;

// 192.0.2.1 is effectively a black hole
Expand Down Expand Up @@ -404,9 +423,11 @@ mod tests {
assert!(reused);
}

#[cfg(unix)]
const MOCK_UDS_PATH: &str = "/tmp/test_unix_transport_connector.sock";

// one-off mock server
#[cfg(unix)]
async fn mock_connect_server() {
let _ = std::fs::remove_file(MOCK_UDS_PATH);
let listener = UnixListener::bind(MOCK_UDS_PATH).unwrap();
Expand Down
48 changes: 44 additions & 4 deletions pingora-core/src/listeners/l4.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,20 @@ use pingora_error::{
use std::fs::Permissions;
use std::io::ErrorKind;
use std::net::{SocketAddr, ToSocketAddrs};
#[cfg(unix)]
use std::os::unix::io::{AsRawFd, FromRawFd};
#[cfg(unix)]
use std::os::unix::net::UnixListener as StdUnixListener;
#[cfg(windows)]
use std::os::windows::io::{AsRawSocket, FromRawSocket};
use std::time::Duration;
use tokio::net::TcpSocket;

use crate::protocols::l4::ext::{set_dscp, set_tcp_fastopen_backlog};
use crate::protocols::l4::listener::Listener;
pub use crate::protocols::l4::stream::Stream;
use crate::protocols::TcpKeepalive;
#[cfg(unix)]
use crate::server::ListenFds;

const TCP_LISTENER_MAX_TRY: usize = 30;
Expand All @@ -40,13 +45,15 @@ const LISTENER_BACKLOG: u32 = 65535;
#[derive(Clone, Debug)]
pub enum ServerAddress {
Tcp(String, Option<TcpSocketOptions>),
#[cfg(unix)]
Uds(String, Option<Permissions>),
}

impl AsRef<str> for ServerAddress {
fn as_ref(&self) -> &str {
match &self {
Self::Tcp(l, _) => l,
#[cfg(unix)]
Self::Uds(l, _) => l,
}
}
Expand Down Expand Up @@ -82,6 +89,7 @@ pub struct TcpSocketOptions {
// TODO: allow configuring reuseaddr, backlog, etc. from here?
}

#[cfg(unix)]
mod uds {
use super::{OrErr, Result};
use crate::protocols::l4::listener::Listener;
Expand Down Expand Up @@ -149,27 +157,35 @@ fn apply_tcp_socket_options(sock: &TcpSocket, opt: Option<&TcpSocketOptions>) ->
.set_only_v6(ipv6_only)
.or_err(BindError, "failed to set IPV6_V6ONLY")?;
}
#[cfg(unix)]
let raw = sock.as_raw_fd();
#[cfg(windows)]
let raw = sock.as_raw_socket();

if let Some(backlog) = opt.tcp_fastopen {
set_tcp_fastopen_backlog(sock.as_raw_fd(), backlog)?;
set_tcp_fastopen_backlog(raw, backlog)?;
}

if let Some(dscp) = opt.dscp {
set_dscp(sock.as_raw_fd(), dscp)?;
set_dscp(raw, dscp)?;
}
Ok(())
}

fn from_raw_fd(address: &ServerAddress, fd: i32) -> Result<Listener> {
match address {
#[cfg(unix)]
ServerAddress::Uds(addr, perm) => {
let std_listener = unsafe { StdUnixListener::from_raw_fd(fd) };
// set permissions just in case
uds::set_perms(addr, perm.clone())?;
Ok(uds::set_backlog(std_listener, LISTENER_BACKLOG)?.into())
}
ServerAddress::Tcp(_, _) => {
#[cfg(unix)]
let std_listener_socket = unsafe { std::net::TcpStream::from_raw_fd(fd) };
#[cfg(windows)]
let std_listener_socket = unsafe { std::net::TcpStream::from_raw_socket(fd as u64) };
let listener_socket = TcpSocket::from_std_stream(std_listener_socket);
// Note that we call listen on an already listening socket
// POSIX undefined but on Linux it will update the backlog size
Expand Down Expand Up @@ -231,6 +247,7 @@ async fn bind_tcp(addr: &str, opt: Option<TcpSocketOptions>) -> Result<Listener>

async fn bind(addr: &ServerAddress) -> Result<Listener> {
match addr {
#[cfg(unix)]
ServerAddress::Uds(l, perm) => uds::bind(l, perm.clone()),
ServerAddress::Tcp(l, opt) => bind_tcp(l, opt.clone()).await,
}
Expand All @@ -253,6 +270,7 @@ impl ListenerEndpoint {
self.listen_addr.as_ref()
}

#[cfg(unix)]
pub async fn listen(&mut self, fds: Option<ListenFds>) -> Result<()> {
if self.listener.is_some() {
return Ok(());
Expand All @@ -278,6 +296,12 @@ impl ListenerEndpoint {
Ok(())
}

#[cfg(windows)]
pub async fn listen(&mut self) -> Result<()> {
self.listener = Some(bind(&self.listen_addr).await?);
Ok(())
}

fn apply_stream_settings(&self, stream: &mut Stream) -> Result<()> {
// settings are applied based on whether the underlying stream supports it
stream.set_nodelay()?;
Expand All @@ -288,7 +312,10 @@ impl ListenerEndpoint {
stream.set_keepalive(ka)?;
}
if let Some(dscp) = op.dscp {
#[cfg(unix)]
set_dscp(stream.as_raw_fd(), dscp)?;
#[cfg(windows)]
set_dscp(stream.as_raw_socket(), dscp)?;
}
Ok(())
}
Expand All @@ -315,7 +342,13 @@ mod test {
async fn test_listen_tcp() {
let addr = "127.0.0.1:7100";
let mut listener = ListenerEndpoint::new(ServerAddress::Tcp(addr.into(), None));
listener.listen(None).await.unwrap();
listener
.listen(
#[cfg(unix)]
None,
)
.await
.unwrap();
tokio::spawn(async move {
// just try to accept once
listener.accept().await.unwrap();
Expand All @@ -332,7 +365,13 @@ mod test {
..Default::default()
});
let mut listener = ListenerEndpoint::new(ServerAddress::Tcp("[::]:7101".into(), sock_opt));
listener.listen(None).await.unwrap();
listener
.listen(
#[cfg(unix)]
None,
)
.await
.unwrap();
tokio::spawn(async move {
// just try to accept twice
listener.accept().await.unwrap();
Expand All @@ -346,6 +385,7 @@ mod test {
.expect("can connect to v6 addr");
}

#[cfg(unix)]
#[tokio::test]
async fn test_listen_uds() {
let addr = "/tmp/test_listen_uds";
Expand Down
Loading

0 comments on commit 4aadba1

Please sign in to comment.