From fb63af2f1cb06fd7db31ea3cde58ee0804f888f2 Mon Sep 17 00:00:00 2001 From: zonyitoo Date: Sat, 26 Jun 2021 00:43:54 +0800 Subject: [PATCH] clearing write readiness if TFO connect returns EINPROGRESS - ref #555 - imperfect until tokio-rs/tokio#3888 was merged --- Cargo.lock | 14 +-- Cargo.toml | 3 + .../src/net/sys/unix/bsd/freebsd.rs | 66 ++++++----- .../shadowsocks/src/net/sys/unix/bsd/macos.rs | 56 +++++---- .../shadowsocks/src/net/sys/unix/linux/mod.rs | 107 ++++++++++-------- crates/shadowsocks/src/net/sys/windows/mod.rs | 100 ++++++++-------- .../shadowsocks/src/relay/tcprelay/utils.rs | 4 +- 7 files changed, 187 insertions(+), 163 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index b9f171838d75..3e48bcc1997d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -531,9 +531,9 @@ dependencies = [ [[package]] name = "hermit-abi" -version = "0.1.18" +version = "0.1.19" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "322f4de77956e22ed0e5032c359a0f1273f1f7f0d79bfa3b8ffbc730d7fbcc5c" +checksum = "62b467343b94ba476dcb2500d242dadbb39557df889310ac77c5d99100aaac33" dependencies = [ "libc", ] @@ -1015,9 +1015,9 @@ checksum = "28988d872ab76095a6e6ac88d99b54fd267702734fd7ffe610ca27f533ddb95a" [[package]] name = "openssl-sys" -version = "0.9.64" +version = "0.9.65" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "209efc2fe0e980c8849efacdb567f975a1c80245c4f6980d6f012733bfa851af" +checksum = "7a7907e3bfa08bb85105209cdfcb6c63d109f8f6c1ed6ca318fff5c1853fbc1d" dependencies = [ "autocfg", "cc", @@ -1814,8 +1814,7 @@ checksum = "cda74da7e1a664f795bb1f8a87ec406fb89a02522cf6e50620d016add6dbbf5c" [[package]] name = "tokio" version = "1.7.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5fb2ed024293bb19f7a5dc54fe83bf86532a44c12a2bb8ba40d64a4509395ca2" +source = "git+https://github.com/zonyitoo/tokio.git#47b76a5ae7a9603dd81579d56938fd3e5ca4266e" dependencies = [ "autocfg", "bytes", @@ -1844,8 +1843,7 @@ dependencies = [ [[package]] name = "tokio-macros" version = "1.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c49e3df43841dafb86046472506755d8501c5615673955f6aa17181125d13c37" +source = "git+https://github.com/zonyitoo/tokio.git#47b76a5ae7a9603dd81579d56938fd3e5ca4266e" dependencies = [ "proc-macro2", "quote", diff --git a/Cargo.toml b/Cargo.toml index 6dcfe64c690c..f119462531f7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -141,3 +141,6 @@ byteorder = "1.3" env_logger = "0.8" byte_string = "1.0" tokio = { version = "1", features = ["net", "time", "macros", "io-util"]} + +[patch.crates-io] +tokio = { git = "https://github.com/zonyitoo/tokio.git" } diff --git a/crates/shadowsocks/src/net/sys/unix/bsd/freebsd.rs b/crates/shadowsocks/src/net/sys/unix/bsd/freebsd.rs index cd4c6a523002..83424ca1fcb3 100644 --- a/crates/shadowsocks/src/net/sys/unix/bsd/freebsd.rs +++ b/crates/shadowsocks/src/net/sys/unix/bsd/freebsd.rs @@ -29,7 +29,7 @@ enum TcpStreamState { } /// A `TcpStream` that supports TFO (TCP Fast Open) -#[pin_project] +#[pin_project(project = TcpStreamProj)] pub struct TcpStream { #[pin] inner: TokioTcpStream, @@ -105,25 +105,25 @@ impl AsyncRead for TcpStream { } impl AsyncWrite for TcpStream { - fn poll_write(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>, buf: &[u8]) -> Poll> { - loop { - let this = self.as_mut().project(); + fn poll_write(self: Pin<&mut Self>, cx: &mut task::Context<'_>, buf: &[u8]) -> Poll> { + let TcpStreamProj { inner, state } = self.project(); - match this.state { - TcpStreamState::FastOpenConnect(addr) => { - // TCP_FASTOPEN was supported since FreeBSD 12.0 - // - // Example program: - // + match *state { + TcpStreamState::Connected => inner.poll_write(cx, buf), - // Wait until socket is writable - ready!(this.inner.poll_write_ready(cx))?; + TcpStreamState::FastOpenConnect(addr) => { + // TCP_FASTOPEN was supported since FreeBSD 12.0 + // + // Example program: + // - unsafe { - let saddr = SockAddr::from(*addr); + let saddr = SockAddr::from(addr); + let stream = inner.get_mut(); + let n = ready!(stream.poll_write_io(cx, || { + unsafe { let ret = libc::sendto( - this.inner.as_raw_fd(), + stream.as_raw_fd(), buf.as_ptr() as *const libc::c_void, buf.len(), 0, // Yes, BSD doesn't need MSG_FASTOPEN @@ -132,34 +132,32 @@ impl AsyncWrite for TcpStream { ); if ret >= 0 { - // Connect successfully. - *(this.state) = TcpStreamState::Connected; - return Ok(ret as usize).into(); + Ok(ret as usize) } else { // Error occurs let err = io::Error::last_os_error(); - // EAGAIN, EWOULDBLOCK - if err.kind() != ErrorKind::WouldBlock { - // EINPROGRESS - if let Some(libc::EINPROGRESS) = err.raw_os_error() { - // For non-blocking socket, it returns the number of bytes queued (and transmitted in the SYN-data packet) if cookie is available. - // If cookie is not available, it transmits a data-less SYN packet with Fast Open cookie request option and returns -EINPROGRESS like connect(). - // - // So in this state. We have to loop again to call `poll_write` for sending the first packet. - *(this.state) = TcpStreamState::Connected; - } else { - // Other errors - return Err(err).into(); - } + // EINPROGRESS + if let Some(libc::EINPROGRESS) = err.raw_os_error() { + // For non-blocking socket, it returns the number of bytes queued (and transmitted in the SYN-data packet) if cookie is available. + // If cookie is not available, it transmits a data-less SYN packet with Fast Open cookie request option and returns -EINPROGRESS like connect(). + // + // So in this state. We have to loop again to call `poll_write` for sending the first packet. + *state = TcpStreamState::Connected; + + // Let `poll_write_io` clears the write readiness. + Err(ErrorKind::WouldBlock.into()) } else { - // Pending on poll_write_ready + // Other errors, including EAGAIN, EWOULDBLOCK + Err(err) } } } - } + }))?; - TcpStreamState::Connected => return this.inner.poll_write(cx, buf), + // Connect successfully with fast open + *state = TcpStreamState::Connected; + Ok(n).into() } } } diff --git a/crates/shadowsocks/src/net/sys/unix/bsd/macos.rs b/crates/shadowsocks/src/net/sys/unix/bsd/macos.rs index dce91dfc731f..06bd51dd636c 100644 --- a/crates/shadowsocks/src/net/sys/unix/bsd/macos.rs +++ b/crates/shadowsocks/src/net/sys/unix/bsd/macos.rs @@ -30,7 +30,7 @@ enum TcpStreamState { } /// A `TcpStream` that supports TFO (TCP Fast Open) -#[pin_project] +#[pin_project(project = TcpStreamProj)] pub struct TcpStream { #[pin] inner: TokioTcpStream, @@ -117,24 +117,27 @@ impl AsyncRead for TcpStream { } impl AsyncWrite for TcpStream { - fn poll_write(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>, buf: &[u8]) -> Poll> { - loop { - let this = self.as_mut().project(); - - match this.state { - TcpStreamState::FastOpenWrite => { - // `CONNECT_RESUME_ON_READ_WRITE` is set when calling `connectx`, - // so the first call of `send` will perform the actual SYN with TFO cookie. - // - // (NOT SURE) If remote server doesn't support TFO or this is the first connection, - // it may return EINPROGRESS just like other platforms (Linux, FreeBSD). - - match ready!(this.inner.poll_write(cx, buf)) { - Ok(n) => { - *(this.state) = TcpStreamState::Connected; - return Ok(n).into(); - } - Err(err) => { + fn poll_write(self: Pin<&mut Self>, cx: &mut task::Context<'_>, buf: &[u8]) -> Poll> { + let TcpStreamProj { inner, state } = self.project(); + + match *state { + TcpStreamState::Connected => inner.poll_write(cx, buf), + + TcpStreamState::FastOpenWrite => { + // `CONNECT_RESUME_ON_READ_WRITE` is set when calling `connectx`, + // so the first call of `send` will perform the actual SYN with TFO cookie. + // + // (NOT SURE) If remote server doesn't support TFO or this is the first connection, + // it may return EINPROGRESS just like other platforms (Linux, FreeBSD). + + let stream = inner.get_mut(); + let n = ready!(stream.poll_write_io(cx, || { + unsafe { + let ret = libc::send(stream.as_raw_fd(), buf.as_ptr() as *const libc::c_void, buf.len(), 0); + if ret >= 0 { + Ok(ret as usize) + } else { + let err = io::Error::last_os_error(); // EAGAIN and EWOULDBLOCK should have been handled by tokio // // EINPROGRESS @@ -143,16 +146,21 @@ impl AsyncWrite for TcpStream { // If cookie is not available, it transmits a data-less SYN packet with Fast Open cookie request option and returns -EINPROGRESS like connect(). // // So in this state. We have to loop again to call `poll_write` for sending the first packet. - *(this.state) = TcpStreamState::Connected; + *state = TcpStreamState::Connected; + + // Let `poll_write_io` clears the write readiness. + Err(ErrorKind::WouldBlock.into()) } else { - // Other errors - return Err(err).into(); + // Other errors, including EAGAIN + Err(err) } } } - } + }))?; - TcpStreamState::Connected => return this.inner.poll_write(cx, buf), + // Connected successfully with fast open + *state = TcpStreamState::Connected; + Ok(n).into() } } } diff --git a/crates/shadowsocks/src/net/sys/unix/linux/mod.rs b/crates/shadowsocks/src/net/sys/unix/linux/mod.rs index b4739927475d..c211ec0450d9 100644 --- a/crates/shadowsocks/src/net/sys/unix/linux/mod.rs +++ b/crates/shadowsocks/src/net/sys/unix/linux/mod.rs @@ -32,7 +32,7 @@ enum TcpStreamState { } /// A `TcpStream` that supports TFO (TCP Fast Open) -#[pin_project] +#[pin_project(project = TcpStreamProj)] pub struct TcpStream { #[pin] inner: TokioTcpStream, @@ -177,24 +177,24 @@ impl AsyncRead for TcpStream { } impl AsyncWrite for TcpStream { - fn poll_write(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>, buf: &[u8]) -> Poll> { - loop { - let this = self.as_mut().project(); + fn poll_write(self: Pin<&mut Self>, cx: &mut task::Context<'_>, buf: &[u8]) -> Poll> { + let TcpStreamProj { inner, state } = self.project(); - match this.state { - TcpStreamState::FastOpenConnect(addr) => { - // Fallback mode. Must be kernal < 4.11 - // - // Uses sendto as BSD-like systems + match *state { + TcpStreamState::Connected => inner.poll_write(cx, buf), - // Wait until socket is writable - ready!(this.inner.poll_write_ready(cx))?; + TcpStreamState::FastOpenConnect(addr) => { + // Fallback mode. Must be kernal < 4.11 + // + // Uses sendto as BSD-like systems - unsafe { - let saddr = SockAddr::from(*addr); + let saddr = SockAddr::from(addr); + let stream = inner.get_mut(); + let n = ready!(stream.poll_write_io(cx, || { + unsafe { let ret = libc::sendto( - this.inner.as_raw_fd(), + stream.as_raw_fd(), buf.as_ptr() as *const libc::c_void, buf.len(), libc::MSG_FASTOPEN, @@ -203,57 +203,68 @@ impl AsyncWrite for TcpStream { ); if ret >= 0 { - // Connect successfully. - *(this.state) = TcpStreamState::Connected; - return Ok(ret as usize).into(); + Ok(ret as usize) } else { // Error occurs let err = io::Error::last_os_error(); - // EAGAIN, EWOULDBLOCK - if err.kind() != ErrorKind::WouldBlock { - // EINPROGRESS - if let Some(libc::EINPROGRESS) = err.raw_os_error() { - // For non-blocking socket, it returns the number of bytes queued (and transmitted in the SYN-data packet) if cookie is available. - // If cookie is not available, it transmits a data-less SYN packet with Fast Open cookie request option and returns -EINPROGRESS like connect(). - // - // So in this state. We have to loop again to call `poll_write` for sending the first packet. - *(this.state) = TcpStreamState::Connected; - } else { - // Other errors - return Err(err).into(); - } + // EINPROGRESS + if let Some(libc::EINPROGRESS) = err.raw_os_error() { + // For non-blocking socket, it returns the number of bytes queued (and transmitted in the SYN-data packet) if cookie is available. + // If cookie is not available, it transmits a data-less SYN packet with Fast Open cookie request option and returns -EINPROGRESS like connect(). + // + // So in this state. We have to loop again to call `poll_write` for sending the first packet. + *state = TcpStreamState::Connected; + + // Let `try_write_io` clears the write readiness. + Err(ErrorKind::WouldBlock.into()) } else { - // Pending on poll_write_ready + // Other errors, including EAGAIN, EWOULDBLOCK + Err(err) } } } - } + }))?; - TcpStreamState::FastOpenWrite => { - // First `write` after `TCP_FASTOPEN_CONNECT` - // Kernel >= 4.11 + // Connect successfully with fast open + *state = TcpStreamState::Connected; + Ok(n).into() + } - match ready!(this.inner.poll_write(cx, buf)) { - Ok(n) => { - *(this.state) = TcpStreamState::Connected; - return Ok(n).into(); - } - Err(err) => { - // EAGAIN and EWOULDBLOCK should have been handled by tokio - // + TcpStreamState::FastOpenWrite => { + // First `write` after `TCP_FASTOPEN_CONNECT` + // Kernel >= 4.11 + + let stream = inner.get_mut(); + let n = ready!(stream.poll_write_io(cx, || { + unsafe { + let ret = libc::send(stream.as_raw_fd(), buf.as_ptr() as *const libc::c_void, buf.len(), 0); + + if ret >= 0 { + Ok(ret as usize) + } else { + let err = io::Error::last_os_error(); // EINPROGRESS if let Some(libc::EINPROGRESS) = err.raw_os_error() { - // loop again to call `poll_write` for sending the first packet - *(this.state) = TcpStreamState::Connected; + // For non-blocking socket, it returns the number of bytes queued (and transmitted in the SYN-data packet) if cookie is available. + // If cookie is not available, it transmits a data-less SYN packet with Fast Open cookie request option and returns -EINPROGRESS like connect(). + // + // So in this state. We have to loop again to call `poll_write` for sending the first packet. + *state = TcpStreamState::Connected; + + // Let `poll_write_io` clears the write readiness. + Err(ErrorKind::WouldBlock.into()) } else { - return Err(err).into(); + // Other errors, including EAGAIN, EWOULDBLOCK + Err(err) } } } - } + }))?; - TcpStreamState::Connected => return this.inner.poll_write(cx, buf), + // Connect successfully with fast open + *state = TcpStreamState::Connected; + Ok(n).into() } } } diff --git a/crates/shadowsocks/src/net/sys/windows/mod.rs b/crates/shadowsocks/src/net/sys/windows/mod.rs index 96d272b5f247..e7f4e0936d53 100644 --- a/crates/shadowsocks/src/net/sys/windows/mod.rs +++ b/crates/shadowsocks/src/net/sys/windows/mod.rs @@ -112,7 +112,7 @@ unsafe impl Send for TcpStreamState {} unsafe impl Sync for TcpStreamState {} /// A `TcpStream` that supports TFO (TCP Fast Open) -#[pin_project] +#[pin_project(project = TcpStreamProj)] pub struct TcpStream { #[pin] inner: TokioTcpStream, @@ -228,22 +228,22 @@ fn set_update_connect_context(sock: SOCKET) -> io::Result<()> { } impl AsyncWrite for TcpStream { - fn poll_write(self: Pin<&mut Self>, cx: &mut task::Context<'_>, buf: &[u8]) -> Poll> { - let this = self.project(); - + fn poll_write(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>, buf: &[u8]) -> Poll> { loop { - match this.state { - TcpStreamState::Connected => { - return this.inner.poll_write(cx, buf); - } + let TcpStreamProj { inner, state } = self.as_mut().project(); + + match *state { + TcpStreamState::Connected => return inner.poll_write(cx, buf), + TcpStreamState::FastOpenConnect(addr) => { + let saddr = SockAddr::from(addr); + unsafe { // https://docs.microsoft.com/en-us/windows/win32/api/mswsock/nc-mswsock-lpfn_connectex let connect_ex = PFN_CONNECTEX_OPT .expect("LPFN_CONNECTEX function doesn't exist. It is only supported after Windows 10"); - let saddr = SockAddr::from(*addr); - let sock = this.inner.as_raw_socket() as SOCKET; + let sock = inner.as_raw_socket() as SOCKET; let mut overlapped: Box = Box::new(mem::zeroed()); @@ -266,7 +266,7 @@ impl AsyncWrite for TcpStream { debug_assert!(bytes_sent as usize <= buf.len()); - *(this.state) = TcpStreamState::Connected; + *state = TcpStreamState::Connected; return Ok(bytes_sent as usize).into(); } @@ -276,47 +276,53 @@ impl AsyncWrite for TcpStream { } // ConnectEx pending (ERROR_IO_PENDING), check later in FastOpenConnecting - *(this.state) = TcpStreamState::FastOpenConnecting(overlapped); + *state = TcpStreamState::FastOpenConnecting(overlapped); } } - TcpStreamState::FastOpenConnecting(ref mut overlapped) => { - // Wait until socket is writable - ready!(this.inner.poll_write_ready(cx))?; - - unsafe { - let sock = this.inner.as_raw_socket() as SOCKET; - - let mut bytes_sent: DWORD = 0; - let mut flags: DWORD = 0; - - // Fetch ConnectEx's result in a non-blocking way. - let ret: BOOL = WSAGetOverlappedResult( - sock, - overlapped.as_mut() as LPOVERLAPPED, - &mut bytes_sent as LPDWORD, - FALSE, // fWait = false, non-blocking, returns WSA_IO_INCOMPLETE - &mut flags as LPDWORD, - ); - - if ret == TRUE { - // Get ConnectEx's result successfully. Socket is connected - - // Make getpeername() works - set_update_connect_context(sock)?; - - debug_assert!(bytes_sent as usize <= buf.len()); - *(this.state) = TcpStreamState::Connected; - return Ok(bytes_sent as usize).into(); + TcpStreamState::FastOpenConnecting(ref mut overlapped) => { + let n = ready!(inner.poll_write_io(cx, || { + unsafe { + let sock = inner.as_raw_socket() as SOCKET; + + let mut bytes_sent: DWORD = 0; + let mut flags: DWORD = 0; + + // Fetch ConnectEx's result in a non-blocking way. + let ret: BOOL = WSAGetOverlappedResult( + sock, + overlapped.as_mut() as LPOVERLAPPED, + &mut bytes_sent as LPDWORD, + FALSE, // fWait = false, non-blocking, returns WSA_IO_INCOMPLETE + &mut flags as LPDWORD, + ); + + if ret == TRUE { + // Get ConnectEx's result successfully. Socket is connected + + // Make getpeername() works + set_update_connect_context(sock)?; + + debug_assert!(bytes_sent as usize <= buf.len()); + + return Ok(bytes_sent as usize); + } + + let err = WSAGetLastError(); + if err == WSA_IO_INCOMPLETE { + // ConnectEx is still not connected. Wait for the next round + // + // Let `try_write_io` clears the write readiness. + Err(ErrorKind::WouldBlock.into()) + } else { + Err(io::Error::from_raw_os_error(err)) + } } + }))?; - let err = WSAGetLastError(); - if err == WSA_IO_INCOMPLETE { - // ConnectEx is still not connected. Wait for the next round - } else { - return Err(io::Error::from_raw_os_error(err)).into(); - } - } + // Connect successfully with fast open + *state = TcpStreamState::Connected; + return Ok(n).into(); } } } diff --git a/crates/shadowsocks/src/relay/tcprelay/utils.rs b/crates/shadowsocks/src/relay/tcprelay/utils.rs index 1aeec93989ef..0a5cddfa040e 100644 --- a/crates/shadowsocks/src/relay/tcprelay/utils.rs +++ b/crates/shadowsocks/src/relay/tcprelay/utils.rs @@ -247,7 +247,7 @@ where (Poll::Ready(a_to_b), Poll::Pending) => { // a -> b finished, then FIN have already sent to b, setting a read timeout on b - if let None = b.timeout() { + if b.timeout().is_none() { b.as_mut().set_timeout_pinned(Some(READ_TIMEOUT_WHEN_ONE_SHUTDOWN)); // poll again to ensure Waker have already registered to the timer @@ -261,7 +261,7 @@ where (Poll::Pending, Poll::Ready(b_to_a)) => { // b -> a finished, then FIN have already sent to a, setting a read timeout on a - if let None = a.timeout() { + if a.timeout().is_none() { a.as_mut().set_timeout_pinned(Some(READ_TIMEOUT_WHEN_ONE_SHUTDOWN)); // poll again to ensure Waker have already registered to the timer