From 9e64ad50a7052f9ebda10fc53ff54783b763ca18 Mon Sep 17 00:00:00 2001 From: Saber Haj Rabiee Date: Thu, 22 Dec 2022 02:40:31 -0800 Subject: [PATCH 1/3] Adding UDP over TCP and websocket mask mode Covering MaskMode tests in options Some cargo clippy fix --- Cargo.lock | 137 +++++++++++++++++++++++++++-------- cmd/Cargo.toml | 10 +-- cmd/README.md | 50 +++++++++---- cmd/src/client.rs | 146 ++++++++++++++++++------------------- cmd/src/lib.rs | 6 +- cmd/src/server.rs | 132 +++++++++++++++++----------------- kaminari/Cargo.toml | 11 ++- kaminari/src/mix.rs | 171 +++++++++++++++++++++++++++++++++++++++----- kaminari/src/opt.rs | 26 +++++-- kaminari/src/ws.rs | 80 +++++++++++++++------ 10 files changed, 530 insertions(+), 239 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 2ce937d..70f54f6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -26,6 +26,12 @@ version = "0.20.0-alpha.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "149ea5dc24cb11513350770afebba32b68e3d2e356f9221351a2a1ee89112a82" +[[package]] +name = "bitflags" +version = "1.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" + [[package]] name = "block-buffer" version = "0.10.3" @@ -47,6 +53,12 @@ version = "3.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "572f695136211188308f16ad2ca5c851a712c464060ae6974944458eb83880ba" +[[package]] +name = "bytes" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dfb24e866b15a1af2a1b663f10c6b6b8f397a84aadb828f12e5b289ec23a3a3c" + [[package]] name = "cc" version = "1.0.77" @@ -119,6 +131,15 @@ dependencies = [ "wasi", ] +[[package]] +name = "hermit-abi" +version = "0.2.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ee512640fe35acbfb4bb779db6f0d80704c2cacfa2e39b601ef3e3f47d1ae4c7" +dependencies = [ + "libc", +] + [[package]] name = "httparse" version = "1.8.0" @@ -140,7 +161,7 @@ version = "0.10.0" dependencies = [ "lazy_static", "lightws", - "rcgen 0.10.0", + "rcgen", "rustls-pemfile", "tokio", "tokio-rustls", @@ -148,30 +169,16 @@ dependencies = [ "webpki-roots", ] -[[package]] -name = "kaminari" -version = "0.10.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e44cc956e6a3f76a9c548c8042ed739907225a9c8e778a65b2e1dca4b04e5492" -dependencies = [ - "lazy_static", - "lightws", - "rcgen 0.9.3", - "rustls-pemfile", - "tokio", - "tokio-rustls", - "webpki-roots", -] - [[package]] name = "kaminari-cmd" version = "0.5.7" dependencies = [ "anyhow", - "kaminari 0.10.0 (registry+https://github.com/rust-lang/crates.io-index)", + "kaminari", "realm_io", "realm_syscall", "tokio", + "udpflow", ] [[package]] @@ -200,6 +207,16 @@ dependencies = [ "tokio", ] +[[package]] +name = "lock_api" +version = "0.4.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "435011366fe56583b16cf956f9df0095b405b82d76425bc8981c0e22e60ec4df" +dependencies = [ + "autocfg", + "scopeguard", +] + [[package]] name = "log" version = "0.4.17" @@ -209,6 +226,12 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "memchr" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2dffe52ecf27772e601905b7522cb4ef790d2cc203488bbd0e2fe85fcb74566d" + [[package]] name = "mio" version = "0.8.5" @@ -221,12 +244,45 @@ dependencies = [ "windows-sys", ] +[[package]] +name = "num_cpus" +version = "1.15.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0fac9e2da13b5eb447a6ce3d392f23a29d8694bff781bf03a16cd9ac8697593b" +dependencies = [ + "hermit-abi", + "libc", +] + [[package]] name = "once_cell" version = "1.16.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "86f0b0d4bf799edbc74508c1e8bf170ff5f41238e5f8225603ca7caaae2b7860" +[[package]] +name = "parking_lot" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3742b2c103b9f06bc9fff0a37ff4912935851bee6d36f3c02bcc755bcfec228f" +dependencies = [ + "lock_api", + "parking_lot_core", +] + +[[package]] +name = "parking_lot_core" +version = "0.9.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7ff9f3fef3968a3ec5945535ed654cb38ff72d7495a25619e2247fb15a2ed9ba" +dependencies = [ + "cfg-if", + "libc", + "redox_syscall", + "smallvec", + "windows-sys", +] + [[package]] name = "pem" version = "1.1.0" @@ -296,18 +352,6 @@ dependencies = [ "getrandom", ] -[[package]] -name = "rcgen" -version = "0.9.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6413f3de1edee53342e6138e75b56d32e7bc6e332b3bd62d497b1929d4cfbcdd" -dependencies = [ - "pem", - "ring", - "time", - "yasna", -] - [[package]] name = "rcgen" version = "0.10.0" @@ -341,6 +385,15 @@ dependencies = [ "socket2", ] +[[package]] +name = "redox_syscall" +version = "0.2.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fb5a58c1855b4b6819d59012155603f0b22ad30cad752600aadfcb695265519a" +dependencies = [ + "bitflags", +] + [[package]] name = "ring" version = "0.16.20" @@ -377,6 +430,12 @@ dependencies = [ "base64 0.13.1", ] +[[package]] +name = "scopeguard" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd" + [[package]] name = "sct" version = "0.7.0" @@ -404,6 +463,21 @@ dependencies = [ "digest", ] +[[package]] +name = "signal-hook-registry" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e51e73328dc4ac0c7ccbda3a494dfa03df1de2f46018127f60c693f2648455b0" +dependencies = [ + "libc", +] + +[[package]] +name = "smallvec" +version = "1.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a507befe795404456341dfab10cef66ead4c041f62b8b11bbb92bffe5d0953e0" + [[package]] name = "socket2" version = "0.4.7" @@ -454,9 +528,14 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "eab6d665857cc6ca78d6e80303a02cea7a7851e85dfbd77cbdc09bd129f1ef46" dependencies = [ "autocfg", + "bytes", "libc", + "memchr", "mio", + "num_cpus", + "parking_lot", "pin-project-lite", + "signal-hook-registry", "socket2", "tokio-macros", "windows-sys", diff --git a/cmd/Cargo.toml b/cmd/Cargo.toml index cb94c1d..f4d4d90 100644 --- a/cmd/Cargo.toml +++ b/cmd/Cargo.toml @@ -12,8 +12,9 @@ license = "GPL-3.0" anyhow = "1" realm_io = "0.3.4" realm_syscall = "0.1.6" -kaminari = { version = "0.10", features = ["ws"] } -tokio = { version = "1.9", features = ["rt", "net", "macros"] } +kaminari = { path = "../kaminari", features = ["all"] } +tokio = { version = "1", features = ["full"] } +udpflow = "0.1" [[bin]] name = "kaminaric" @@ -22,8 +23,3 @@ path = "src/client.rs" [[bin]] name = "kaminaris" path = "src/server.rs" - -[features] -default = ["tls-rustls"] -tls-rustls = ["kaminari/tls"] -tls-openssl = [] diff --git a/cmd/README.md b/cmd/README.md index b8f038f..4dd8092 100644 --- a/cmd/README.md +++ b/cmd/README.md @@ -5,18 +5,18 @@ [![downloads](https://img.shields.io/github/downloads/zephyrchien/kaminari/total?color=green)](https://github.com/zephyrchien/kaminari/releases) [![telegram](https://img.shields.io/badge/-telegram-blue?style=flat&color=grey&logo=telegram)](https://t.me/+zKbZTvQE2XtiYmIx) -Blazing-fast websocket tunnel built on top of [lightws](https://github.com/zephyrchien/lightws). +A blazing fast TCP and UDP tunnel. ## Intro -- Client side receives tcp then sends [tcp/ws/tls/wss]. +- Client side listen on tcp or udp and sends traffics through [tcp/ws/tls/wss] to server side. -- Server side receives [tcp/ws/tls/wss] then sends tcp. +- Server side receives the traffics then sends them to desginated remote through tcp or udp . - Compatible with shadowsocks [SIP003 plugin](https://shadowsocks.org/guide/sip003.html). ```text - tcp ws/tls/wss tcp + tcp/udp ws/tls/wss/tcp tcp/udp === ============ === +-------------------+ +-------------------+ | | | | @@ -24,7 +24,7 @@ Blazing-fast websocket tunnel built on top of [lightws](https://github.com/zephy | kaminaric | | kaminaris | <-------+ <--------------+ <-------+ | | | | - +-------------------+ +-------------------+ + +-------------------+ +-------------------+ ``` ## Usage @@ -57,9 +57,9 @@ use `ws` to enable websocket. Client or server side options: -- `host=`* : set http host. +- `host=`\* : set http host. -- `path=`* : set http path. +- `path=`\* : set http path. Client side extra options: @@ -79,7 +79,7 @@ use `tls` to enable tls. Client side options: -- `sni=`* : set sni. +- `sni=`\* : set sni. - `alpn=`: set alpn. e.g.: `h2,http/1.1`. @@ -91,11 +91,11 @@ Server side options: Requires either `cert+key` or `servername`. -- `key=`* : private key path. +- `key=`\* : private key path. -- `cert=`* : certificate path. +- `cert=`\* : certificate path. -- `servername=`* : generate self signed cert/key, use $name as CN. +- `servername=`\* : generate self signed cert/key, use $name as CN. - `ocsp=`: der-encoded OCSP response. @@ -113,6 +113,12 @@ openssl ocsp -issuer \ -respout -noverify -no_nonce ``` +#### UDP Over TCP Options + +use `uot` to enable udp over tcp feature. + +Note: Currently, `uot` option works independently or with tls options. + ### Examples tcp ⇋ ws --- ws ⇋ tcp: @@ -147,6 +153,26 @@ kaminaris 127.0.0.1:20000 127.0.0.1:30000 'ws;host=example.com;path=/ws;tls;cert kaminaris 127.0.0.1:20000 127.0.0.1:30000 'ws;host=example.com;path=/ws;tls;servername=example.com' ``` +udp ⇋ tcp --- tcp ⇋ udp: + +```shell +kaminaric 127.0.0.1:10000 127.0.0.1:20000 'uot' + +kaminaris 127.0.0.1:20000 127.0.0.1:30000 'uot' +``` + +udp ⇋ tls --- tls ⇋ udp: + +```shell +kaminaric 127.0.0.1:10000 127.0.0.1:20000 'uot;tls;sni=example.com' + +# use cert + key +kaminaris 127.0.0.1:20000 127.0.0.1:30000 'uot;tls;cert=example.com.crt;key=example.com.key' + +# or generate self signed cert/key +kaminaris 127.0.0.1:20000 127.0.0.1:30000 'uot;tls;servername=example.com' +``` + shadowsocks plugin: ```shell @@ -161,7 +187,7 @@ sslocal -b "127.0.0.1:1080" -s "example.com:8080" -m "aes-128-gcm" -k "123456" \ --plugin-opts "ws;host=example.com;path=/chat" ``` -*To use `v2ray-plugin` on client side, add `mux=0` to disable multiplex, so that it sends standard websocket stream which can be handled by `kaminari` or any other middlewares. +\*To use `v2ray-plugin` on client side, add `mux=0` to disable multiplex, so that it sends standard websocket stream which can be handled by `kaminari` or any other middlewares. ```shell sslocal -b "127.0.0.1:1080" -s "example.com:8080" -m "aes-128-gcm" -k "123456" \ diff --git a/cmd/src/client.rs b/cmd/src/client.rs index 44f9ac3..6d78eec 100644 --- a/cmd/src/client.rs +++ b/cmd/src/client.rs @@ -2,122 +2,118 @@ use std::net::SocketAddr; use anyhow::Result; use tokio::net::{TcpListener, TcpStream}; -use realm_io::{CopyBuffer, bidi_copy_buf}; +use realm_io::{CopyBuffer, bidi_copy_buf, buf_size}; use kaminari::opt; -use kaminari::trick::Ref; use kaminari::AsyncConnect; -use kaminari::nop::NopConnect; -use kaminari::ws::WsConnect; -#[cfg(any(feature = "tls-rustls", feature = "tls-openssl"))] -use kaminari::tls::TlsConnect; - -use kaminari_cmd::{Endpoint, parse_cmd, parse_env}; +use kaminari::uot::UotConnect; +use kaminari::mix::{MixConnect, MixClientConf}; +use tokio::net::UdpSocket; +use udpflow::{UdpListener, UdpStreamLocal}; +use kaminari_cmd::{Endpoint, parse_cmd, parse_env, UDP_MAX_BUF_LENGTH}; + +enum Listener { + TcpListener(TcpListener), + UdpListener(UdpListener), +} -#[tokio::main(flavor = "current_thread")] +#[tokio::main] async fn main() -> Result<()> { let (Endpoint { local, remote }, options) = parse_env().or_else(|_| parse_cmd())?; let ws = opt::get_ws_conf(&options); - #[cfg(any(feature = "tls-rustls", feature = "tls-openssl"))] let tls = opt::get_tls_client_conf(&options); eprintln!("listen: {}", &local); eprintln!("remote: {}", &remote); - if let Some(ws) = &ws { - eprintln!("ws: {}", ws) + if let Some(ref ws) = ws { + eprintln!("ws: {ws}") } - #[cfg(any(feature = "tls-rustls", feature = "tls-openssl"))] - if let Some(tls) = &tls { + if let Some(ref tls) = tls { eprintln!("tls: {}", &tls); } - let lis = TcpListener::bind(local).await?; + let connector = MixConnect::new_shared(MixClientConf { ws, tls }); + + let uot = opt::get_uot_conf(&options); + if uot.is_some() { + eprintln!("UDP over TCP enabled."); + } + + let lis = match uot { + Some(_) => { + let socket = UdpSocket::bind(local).await.unwrap(); + Listener::UdpListener(UdpListener::new(socket)) + } + None => Listener::TcpListener(TcpListener::bind(local).await.unwrap()), + }; #[cfg(all(unix, not(target_os = "android")))] let _ = realm_syscall::bump_nofile_limit(); - macro_rules! run { - ($cc: expr) => { - println!("connect: {}", $cc.as_ref()); - loop { - match lis.accept().await { + // let connector = Ref::new(&connector); + println!("connect: {}", &connector); + loop { + match lis { + Listener::TcpListener(ref lis) => match lis.accept().await { + Ok((stream, _)) => { + tokio::spawn(relay_tcp(stream, remote, connector.clone())); + } + Err(e) => { + eprintln!("accept error: {e}"); + break; + } + }, + Listener::UdpListener(ref lis) => { + let mut buf = vec![0u8; UDP_MAX_BUF_LENGTH]; + match lis.accept(&mut buf).await { Ok((stream, _)) => { - tokio::spawn(relay(stream, remote, $cc)); + tokio::spawn(relay_uot(stream, remote, connector.clone(), buf)); } Err(e) => { - eprintln!("accept error: {}", e); + eprintln!("accept error: {e}"); break; } } } - }; - } - - macro_rules! run_ws_each { - ($client: expr) => { - let ws_mask_mode = opt::get_opt!(&options => "mask"); - match ws_mask_mode { - Some("standard") => { - eprintln!("mask: standard"); - let client = $client.standard(); - run!(Ref::new(&client)); - }, - Some("fixed") => { - let client = $client.fixed(); - eprintln!("mask: fixed"); - run!(Ref::new(&client)); - }, - _ => { - eprintln!("mask: skip"); - run!(Ref::new(&$client)); - } - }; } } - #[cfg(any(feature = "tls-rustls", feature = "tls-openssl"))] - match (ws, tls) { - (None, None) => { - let client = NopConnect {}; - run!(Ref::new(&client)); - } - (Some(ws), None) => { - let client = WsConnect::new(NopConnect {}, ws); - run_ws_each!(client); - } - (None, Some(tls)) => { - let client = TlsConnect::new(NopConnect {}, tls); - run!(Ref::new(&client)); - } - (Some(ws), Some(tls)) => { - let client = WsConnect::new(TlsConnect::new(NopConnect {}, tls), ws); - run_ws_each!(client); - } - }; + Ok(()) +} - #[cfg(not(any(feature = "tls-rustls", feature = "tls-openssl")))] - if let Some(ws) = ws { - let client = WsConnect::new(NopConnect {}, ws); - run_ws_each!(client); - } else { - let client = NopConnect {}; - run!(Ref::new(&client)); - } +async fn relay_tcp(mut local: TcpStream, remote: SocketAddr, client: T) -> std::io::Result<()> +where + T: AsyncConnect, +{ + let mut buf1 = vec![0u8; buf_size()]; + let buf2 = vec![0u8; buf_size()]; - Ok(()) + let remote = TcpStream::connect(remote).await?; + let mut remote = client.connect(remote, &mut buf1).await?; + + let buf1 = CopyBuffer::new(buf1.into_boxed_slice()); + let buf2 = CopyBuffer::new(buf2.into_boxed_slice()); + + bidi_copy_buf(&mut local, &mut remote, buf1, buf2).await } -async fn relay(mut local: TcpStream, remote: SocketAddr, client: Ref) -> std::io::Result<()> +async fn relay_uot( + mut local: UdpStreamLocal, + remote: SocketAddr, + client: T, + mut buf1: Vec, +) -> std::io::Result<()> where T: AsyncConnect, { - let mut buf1 = vec![0u8; 0x2000]; - let buf2 = vec![0u8; 0x2000]; + println!("{} -> {remote}", local.peer_addr()); + let buf2 = vec![0u8; UDP_MAX_BUF_LENGTH]; let remote = TcpStream::connect(remote).await?; + let client = UotConnect::new(client); let mut remote = client.connect(remote, &mut buf1).await?; let buf1 = CopyBuffer::new(buf1.into_boxed_slice()); diff --git a/cmd/src/lib.rs b/cmd/src/lib.rs index 7382916..d60244e 100644 --- a/cmd/src/lib.rs +++ b/cmd/src/lib.rs @@ -2,6 +2,8 @@ use std::env; use std::net::{SocketAddr, ToSocketAddrs}; use anyhow::Result; +pub const UDP_MAX_BUF_LENGTH: usize = 0xffff; + pub struct Endpoint { pub local: SocketAddr, pub remote: SocketAddr, @@ -14,12 +16,12 @@ pub fn parse_env() -> Result<(Endpoint, String)> { let remote_port = env::var("SS_REMOTE_PORT")?; let plugin_opts = env::var("SS_PLUGIN_OPTIONS")?; - let local = format!("{}:{}", local_host, local_port) + let local = format!("{local_host}:{local_port}") .to_socket_addrs()? .next() .unwrap(); - let remote = format!("{}:{}", remote_host, remote_port) + let remote = format!("{remote_host}:{remote_port}") .to_socket_addrs()? .next() .unwrap(); diff --git a/cmd/src/server.rs b/cmd/src/server.rs index 93d0d62..a7ffc87 100644 --- a/cmd/src/server.rs +++ b/cmd/src/server.rs @@ -2,19 +2,24 @@ use std::net::SocketAddr; use anyhow::Result; use tokio::net::{TcpListener, TcpStream}; -use realm_io::{CopyBuffer, bidi_copy_buf}; +use realm_io::{CopyBuffer, bidi_copy_buf, buf_size}; use kaminari::opt; -use kaminari::trick::Ref; use kaminari::AsyncAccept; -use kaminari::nop::NopAccept; -use kaminari::ws::WsAccept; -#[cfg(any(feature = "tls-rustls", feature = "tls-openssl"))] -use kaminari::tls::TlsAccept; +use kaminari::uot::UotAccept; +use kaminari::mix::{MixAccept, MixServerConf}; +use tokio::net::UdpSocket; +use udpflow::UdpStreamRemote; -use kaminari_cmd::{Endpoint, parse_cmd, parse_env}; +use kaminari_cmd::{Endpoint, parse_cmd, parse_env, UDP_MAX_BUF_LENGTH}; -#[tokio::main(flavor = "current_thread")] +#[derive(Clone)] +enum Streamer { + TcpStream(SocketAddr), + UdpStream(SocketAddr), +} + +#[tokio::main] async fn main() -> Result<()> { let (Endpoint { local, remote }, options) = parse_env() .map(|(Endpoint { local, remote }, opt)| { @@ -29,89 +34,84 @@ async fn main() -> Result<()> { .or_else(|_| parse_cmd())?; let ws = opt::get_ws_conf(&options); - - #[cfg(any(feature = "tls-rustls", feature = "tls-openssl"))] let tls = opt::get_tls_server_conf(&options); eprintln!("listen: {}", &local); eprintln!("remote: {}", &remote); if let Some(ws) = &ws { - eprintln!("ws: {}", ws) + eprintln!("ws: {ws}") } - #[cfg(any(feature = "tls-rustls", feature = "tls-openssl"))] if let Some(tls) = &tls { eprintln!("tls: {}", &tls); } + let uot = opt::get_uot_conf(&options); + if uot.is_some() { + eprintln!("UDP over TCP enabled."); + } + + let acceptor = MixAccept::new_shared(MixServerConf { ws, tls }); + + let uot = opt::get_uot_conf(&options); + + let remote = match uot { + Some(_) => Streamer::UdpStream(remote), + None => Streamer::TcpStream(remote), + }; + let lis = TcpListener::bind(local).await?; #[cfg(all(unix, not(target_os = "android")))] let _ = realm_syscall::bump_nofile_limit(); - macro_rules! run { - ($ac: expr) => { - println!("accept: {}", $ac.as_ref()); - loop { - match lis.accept().await { - Ok((stream, _)) => { - tokio::spawn(relay(stream, remote, $ac)); - } - Err(e) => { - eprintln!("accept error: {}", e); - break; - } - } + println!("accept: {}", &acceptor); + loop { + match lis.accept().await { + Ok((stream, _)) => { + tokio::spawn(relay(stream, remote.clone(), acceptor.clone())); + } + Err(e) => { + eprintln!("accept error: {e}"); + break; } - }; - } - - #[cfg(any(feature = "tls-rustls", feature = "tls-openssl"))] - match (ws, tls) { - (None, None) => { - let server = NopAccept {}; - run!(Ref::new(&server)); - } - (Some(ws), None) => { - let server = WsAccept::new(NopAccept {}, ws); - run!(Ref::new(&server)); - } - (None, Some(tls)) => { - let server = TlsAccept::new(NopAccept {}, tls); - run!(Ref::new(&server)); - } - (Some(ws), Some(tls)) => { - let server = WsAccept::new(TlsAccept::new(NopAccept {}, tls), ws); - run!(Ref::new(&server)); } - }; - - #[cfg(not(any(feature = "tls-rustls", feature = "tls-openssl")))] - if let Some(ws) = ws { - let server = WsAccept::new(NopAccept {}, ws); - run!(Ref::new(&server)); - } else { - let server = NopAccept {}; - run!(Ref::new(&server)); } Ok(()) } -async fn relay>( - local: TcpStream, - remote: SocketAddr, - server: Ref, -) -> std::io::Result<()> { - let mut buf1 = vec![0u8; 0x2000]; - let buf2 = vec![0u8; 0x2000]; +async fn relay(local: TcpStream, remote: Streamer, server: T) -> std::io::Result<()> +where + T: AsyncAccept, +{ + match remote { + Streamer::TcpStream(remote) => { + let mut buf1 = vec![0u8; buf_size()]; + let buf2 = vec![0u8; buf_size()]; + let mut local = server.accept(local, &mut buf1).await?; + let mut remote = TcpStream::connect(remote).await?; + + let buf1 = CopyBuffer::new(buf1); + let buf2 = CopyBuffer::new(buf2); + + bidi_copy_buf(&mut local, &mut remote, buf1, buf2).await + } + Streamer::UdpStream(remote) => { + println!("{} -> {remote}", local.peer_addr()?); + let mut buf1 = vec![0u8; UDP_MAX_BUF_LENGTH]; + let buf2 = vec![0u8; UDP_MAX_BUF_LENGTH]; + let server = UotAccept::new(server); + let mut local = server.accept(local, &mut buf1).await?; - let mut local = server.accept(local, &mut buf1).await?; - let mut remote = TcpStream::connect(remote).await?; + let socket = UdpSocket::bind("127.0.0.1:0").await?; + let mut remote = UdpStreamRemote::new(socket, remote); - let buf1 = CopyBuffer::new(buf1.into_boxed_slice()); - let buf2 = CopyBuffer::new(buf2.into_boxed_slice()); + let buf1 = CopyBuffer::new(buf1.into_boxed_slice()); + let buf2 = CopyBuffer::new(buf2.into_boxed_slice()); - bidi_copy_buf(&mut local, &mut remote, buf1, buf2).await + bidi_copy_buf(&mut local, &mut remote, buf1, buf2).await + } + } } diff --git a/kaminari/Cargo.toml b/kaminari/Cargo.toml index 309864c..1ba070c 100644 --- a/kaminari/Cargo.toml +++ b/kaminari/Cargo.toml @@ -26,16 +26,21 @@ tokio = "1.9" lazy_static = "1" # ws -lightws = { version = "0.6", features = ["unsafe_auto_mask_write"], optional = true } +lightws = { version = "0.6", features = [ + "unsafe_auto_mask_write", +], optional = true } # uot udpflow = { version = "0.1.0", optional = true } # tls -tokio-rustls = { version = "0.23", features = ["early-data", "dangerous_configuration"], optional = true } +tokio-rustls = { version = "0.23", features = [ + "early-data", + "dangerous_configuration", +], optional = true } webpki-roots = { version = "0.22", optional = true } rustls-pemfile = { version = "1", optional = true } -rcgen = {version = "0.10", optional = true } +rcgen = { version = "0.10", optional = true } [package.metadata.docs.rs] all-features = true diff --git a/kaminari/src/mix.rs b/kaminari/src/mix.rs index 4bafe76..df7c6cd 100644 --- a/kaminari/src/mix.rs +++ b/kaminari/src/mix.rs @@ -2,6 +2,8 @@ use std::io::Result; use std::future::Future; use std::fmt::{Display, Formatter}; +use crate::ws; + use super::{IOStream, AsyncAccept, AsyncConnect}; use super::nop::{NopAccept, NopConnect}; use super::ws::{WsConf, WsAccept, WsConnect}; @@ -18,8 +20,12 @@ pub struct MixClientConf { pub enum MixConnect { Plain(NopConnect), Ws(WsConnect), + WsFixed(WsConnect), + WsStandard(WsConnect), Tls(TlsConnect), Wss(WsConnect>), + WssFixed(WsConnect, ws::Fixed>), + WssStandard(WsConnect, ws::Standard>), } impl MixConnect { @@ -28,9 +34,21 @@ impl MixConnect { let MixClientConf { ws, tls } = conf; match (ws, tls) { (None, None) => Plain(NopConnect {}), - (Some(ws), None) => Ws(WsConnect::new(NopConnect {}, ws)), + (Some(ws), None) => match ws.mask_mode { + ws::MaskMode::Skip => Ws(WsConnect::new(NopConnect {}, ws)), + ws::MaskMode::Fixed => WsFixed(WsConnect::new(NopConnect {}, ws)), + ws::MaskMode::Standard => WsStandard(WsConnect::new(NopConnect {}, ws)), + }, (None, Some(tls)) => Tls(TlsConnect::new(NopConnect {}, tls)), - (Some(ws), Some(tls)) => Wss(WsConnect::new(TlsConnect::new(NopConnect {}, tls), ws)), + (Some(ws), Some(tls)) => match ws.mask_mode { + ws::MaskMode::Skip => Wss(WsConnect::new(TlsConnect::new(NopConnect {}, tls), ws)), + ws::MaskMode::Fixed => { + WssFixed(WsConnect::new(TlsConnect::new(NopConnect {}, tls), ws)) + } + ws::MaskMode::Standard => { + WssStandard(WsConnect::new(TlsConnect::new(NopConnect {}, tls), ws)) + } + }, } } @@ -39,12 +57,26 @@ impl MixConnect { let MixClientConf { ws, tls } = conf; match (ws, tls) { (None, None) => Plain(NopConnect {}), - (Some(ws), None) => Ws(WsConnect::new(NopConnect {}, ws)), + (Some(ws), None) => match ws.mask_mode { + ws::MaskMode::Skip => Ws(WsConnect::new(NopConnect {}, ws)), + ws::MaskMode::Fixed => WsFixed(WsConnect::new(NopConnect {}, ws)), + ws::MaskMode::Standard => WsStandard(WsConnect::new(NopConnect {}, ws)), + }, (None, Some(tls)) => Tls(TlsConnect::new_shared(NopConnect {}, tls)), - (Some(ws), Some(tls)) => Wss(WsConnect::new( - TlsConnect::new_shared(NopConnect {}, tls), - ws, - )), + (Some(ws), Some(tls)) => match ws.mask_mode { + ws::MaskMode::Skip => Wss(WsConnect::new( + TlsConnect::new_shared(NopConnect {}, tls), + ws, + )), + ws::MaskMode::Fixed => WssFixed(WsConnect::new( + TlsConnect::new_shared(NopConnect {}, tls), + ws, + )), + ws::MaskMode::Standard => WssStandard(WsConnect::new( + TlsConnect::new_shared(NopConnect {}, tls), + ws, + )), + }, } } } @@ -62,8 +94,12 @@ impl AsyncConnect for MixConnect { match self { Plain(cc) => cc.connect(stream, buf).await.map(MixS::Plain), Ws(cc) => cc.connect(stream, buf).await.map(MixS::Ws), + WsFixed(cc) => cc.connect(stream, buf).await.map(MixS::WsFixed), + WsStandard(cc) => cc.connect(stream, buf).await.map(MixS::WsStandard), Tls(cc) => cc.connect(stream, buf).await.map(MixS::Tls), Wss(cc) => cc.connect(stream, buf).await.map(MixS::Wss), + WssFixed(cc) => cc.connect(stream, buf).await.map(MixS::WssFixed), + WssStandard(cc) => cc.connect(stream, buf).await.map(MixS::WssStandard), } } } @@ -138,15 +174,19 @@ mod stream { use std::pin::Pin; use std::task::{Poll, Context}; use tokio::io::{ReadBuf, AsyncRead, AsyncWrite}; - use crate::ws::{WsClientStream, WsServerStream}; + use crate::ws::{WsClientStream, WsServerStream, WsStandardClientStream, WsFixedClientStream}; use crate::tls::{TlsClientStream, TlsServerStream}; #[derive(Debug)] pub enum MixClientStream { Plain(T), Ws(WsClientStream), + WsFixed(WsFixedClientStream), + WsStandard(WsStandardClientStream), Tls(TlsClientStream), Wss(WsClientStream>), + WssFixed(WsFixedClientStream>), + WssStandard(WsStandardClientStream>), } #[derive(Debug)] @@ -174,7 +214,7 @@ mod stream { }; } - macro_rules! impl_async_read { + macro_rules! impl_async_read_server { ($stream: ident) => { impl AsyncRead for $stream { fn poll_read( @@ -189,7 +229,7 @@ mod stream { }; } - macro_rules! impl_async_write { + macro_rules! impl_async_write_server { ($stream: ident) => { impl AsyncWrite for $stream { fn poll_write( @@ -214,10 +254,96 @@ mod stream { }; } - impl_async_read!(MixClientStream); - impl_async_write!(MixClientStream); - impl_async_read!(MixServerStream); - impl_async_write!(MixServerStream); + macro_rules! impl_async_read_client { + ($stream: ident) => { + impl AsyncRead for $stream { + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut ReadBuf<'_>, + ) -> Poll> { + use $stream::*; + call_each!( + self || Plain, + Ws, + WsFixed, + WsStandard, + Tls, + Wss, + WssFixed, + WssStandard, + || poll_read, + cx, + buf + ) + } + } + }; + } + + macro_rules! impl_async_write_client { + ($stream: ident) => { + impl AsyncWrite for $stream { + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + use $stream::*; + call_each!( + self || Plain, + Ws, + WsFixed, + WsStandard, + Tls, + Wss, + WssFixed, + WssStandard, + || poll_write, + cx, + buf + ) + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + use $stream::*; + call_each!( + self || Plain, + Ws, + WsFixed, + WsStandard, + Tls, + Wss, + WssFixed, + WssStandard, + || poll_flush, + cx + ) + } + + fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + use $stream::*; + call_each!( + self || Plain, + Ws, + WsFixed, + WsStandard, + Tls, + Wss, + WssFixed, + WssStandard, + || poll_shutdown, + cx + ) + } + } + }; + } + + impl_async_read_client!(MixClientStream); + impl_async_write_client!(MixClientStream); + impl_async_read_server!(MixServerStream); + impl_async_write_server!(MixServerStream); } // ========== type cast ========== @@ -272,7 +398,16 @@ macro_rules! impl_display { }; } -impl_display!(MixConnect || Plain, Ws, Tls, Wss,); +impl_display!( + MixConnect || Plain, + Ws, + WsFixed, + WsStandard, + Tls, + Wss, + WssFixed, + WssStandard, +); impl_display!(MixAccept || Plain, Ws, Tls, Wss,); #[cfg(test)] @@ -285,6 +420,7 @@ mod test { ws: Some(WsConf { host: String::from("abc"), path: String::from("chat"), + mask_mode: ws::MaskMode::Skip, }), tls: Some(TlsClientConf { sni: String::from("abc"), @@ -299,7 +435,7 @@ mod test { let conn = MixConnect::new(conf); - println!("{}", conn); + println!("{conn}"); } #[test] @@ -308,6 +444,7 @@ mod test { ws: Some(WsConf { host: String::from("abc"), path: String::from("chat"), + mask_mode: ws::MaskMode::Skip, }), tls: Some(TlsServerConf { crt: String::new(), @@ -322,6 +459,6 @@ mod test { let lis = MixAccept::new(conf); - println!("{}", lis); + println!("{lis}"); } } diff --git a/kaminari/src/opt.rs b/kaminari/src/opt.rs index c0a30b8..1693b2e 100644 --- a/kaminari/src/opt.rs +++ b/kaminari/src/opt.rs @@ -33,6 +33,17 @@ macro_rules! get_opt { pub use has_opt; pub use get_opt; +#[cfg(feature = "uot")] +pub fn get_uot_conf(s: &str) -> Option<()> { + let it = s.split(';').map(|x| x.trim()); + + if !has_opt!(it.clone(), "uot") { + return None; + } + + Some(()) +} + #[cfg(feature = "ws")] pub fn get_ws_conf(s: &str) -> Option { let it = s.split(';').map(|x| x.trim()); @@ -41,6 +52,8 @@ pub fn get_ws_conf(s: &str) -> Option { return None; } + let mask_mode = get_opt!(it.clone(), "mask"); + let host = get_opt!(it.clone(), "host"); let path = get_opt!(it.clone(), "path"); @@ -48,6 +61,7 @@ pub fn get_ws_conf(s: &str) -> Option { Some(WsConf { host: String::from(host), path: String::from(path), + mask_mode: mask_mode.into(), }) } else { panic!("ws: require host and path") @@ -119,22 +133,24 @@ mod test { #[test] #[cfg(feature = "ws")] fn ws_conf() { + use crate::ws::MaskMode; macro_rules! y { - ( $( ($s:expr, $host: expr, $path: expr); )+ )=> { + ( $( ($s:expr, $host: expr, $path: expr, $mask: expr); )+ )=> { $( assert_eq!(get_ws_conf($s), Some(WsConf{ host: String::from($host), path: String::from($path), + mask_mode: $mask, })); )+ } } y![ - ("ws;host=a.b.c;path=/", "a.b.c", "/"); - ("ws;host=a.b.c;path=/abc", "a.b.c", "/abc"); - ("ws;path=/abc;host=a.b.c", "a.b.c", "/abc"); - ("ws;path=/abc;host=a.b.c;", "a.b.c", "/abc"); + ("ws;host=a.b.c;path=/", "a.b.c", "/", MaskMode::Skip); + ("ws;host=a.b.c;path=/abc;mask=standard", "a.b.c", "/abc", MaskMode::Standard); + ("ws;path=/abc;host=a.b.c;mask=fixed", "a.b.c", "/abc", MaskMode::Fixed); + ("ws;path=/abc;host=a.b.c;", "a.b.c", "/abc", MaskMode::Skip); ]; } diff --git a/kaminari/src/ws.rs b/kaminari/src/ws.rs index 9d748fc..794eafc 100644 --- a/kaminari/src/ws.rs +++ b/kaminari/src/ws.rs @@ -15,15 +15,61 @@ pub type WsClientStream = WsStream; pub type WsStandardClientStream = WsStream; pub type WsFixedClientStream = WsStream; +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum MaskMode { + Skip, + Fixed, + Standard, +} + +impl From> for MaskMode { + fn from(item: Option<&str>) -> Self { + match item { + Some(item) => match item { + "skip" => Self::Skip, + "fixed" => Self::Fixed, + "standard" => Self::Standard, + _ => panic!("{item} mask mode is not supported."), + }, + None => Self::Skip, + } + } +} + +impl Display for MaskMode { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!( + f, + "{}", + match self { + Self::Skip => "skip", + Self::Fixed => "fixed", + Self::Standard => "standard", + } + ) + } +} + +impl Default for MaskMode { + fn default() -> Self { + Self::Skip + } +} + #[derive(Debug, Clone, PartialEq, Eq)] pub struct WsConf { pub host: String, pub path: String, + pub mask_mode: MaskMode, } impl Display for WsConf { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - write!(f, "host: {}, path: {}", self.host, self.path) + write!( + f, + "host: {}, path: {}, mask: {}", + self.host, self.path, self.mask_mode + ) } } @@ -64,10 +110,12 @@ impl Display for WsConnect where T: Display, { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { write!(f, "[ws]{}", self.conn) } + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "[ws]{}", self.conn) + } } -impl WsConnect { +impl WsConnect { #[inline] pub const fn new(conn: T, conf: WsConf) -> Self { Self { @@ -76,24 +124,6 @@ impl WsConnect { _marker: PhantomData, } } - - #[inline] - pub fn standard(self) -> WsConnect { - WsConnect { - conn: self.conn, - conf: self.conf, - _marker: PhantomData, - } - } - - #[inline] - pub fn fixed(self) -> WsConnect { - WsConnect { - conn: self.conn, - conf: self.conf, - _marker: PhantomData, - } - } } impl AsyncConnect for WsConnect @@ -133,12 +163,16 @@ impl Display for WsAccept where T: Display, { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { write!(f, "[ws]{}", self.lis) } + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "[ws]{}", self.lis) + } } impl WsAccept { #[inline] - pub const fn new(lis: T, conf: WsConf) -> Self { Self { lis, conf } } + pub const fn new(lis: T, conf: WsConf) -> Self { + Self { lis, conf } + } } impl AsyncAccept for WsAccept From 4a20ebee6c1b85b31d57f4b784cb391d95652a31 Mon Sep 17 00:00:00 2001 From: Saber Haj Rabiee Date: Fri, 23 Dec 2022 23:50:50 -0800 Subject: [PATCH 2/3] docs: correcting README.md --- cmd/README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cmd/README.md b/cmd/README.md index 4dd8092..908477b 100644 --- a/cmd/README.md +++ b/cmd/README.md @@ -5,11 +5,11 @@ [![downloads](https://img.shields.io/github/downloads/zephyrchien/kaminari/total?color=green)](https://github.com/zephyrchien/kaminari/releases) [![telegram](https://img.shields.io/badge/-telegram-blue?style=flat&color=grey&logo=telegram)](https://t.me/+zKbZTvQE2XtiYmIx) -A blazing fast TCP and UDP tunnel. +A blazing fast TCP, TLS and WebSocket tunnel. ## Intro -- Client side listen on tcp or udp and sends traffics through [tcp/ws/tls/wss] to server side. +- Client side listens on tcp or udp and sends traffics through [tcp/ws/tls/wss] to server side. - Server side receives the traffics then sends them to desginated remote through tcp or udp . From c1665212c1ce4ef84d619268950575aed7a1a367 Mon Sep 17 00:00:00 2001 From: Saber Haj Rabiee Date: Tue, 2 May 2023 14:40:13 -0700 Subject: [PATCH 3/3] fix: a few fixes related to upstream commits --- cmd/README.md | 2 -- cmd/src/client.rs | 8 ++++++-- cmd/src/server.rs | 8 ++++++-- kaminari/Cargo.toml | 2 +- 4 files changed, 13 insertions(+), 7 deletions(-) diff --git a/cmd/README.md b/cmd/README.md index 908477b..a10911c 100644 --- a/cmd/README.md +++ b/cmd/README.md @@ -117,8 +117,6 @@ openssl ocsp -issuer \ use `uot` to enable udp over tcp feature. -Note: Currently, `uot` option works independently or with tls options. - ### Examples tcp ⇋ ws --- ws ⇋ tcp: diff --git a/cmd/src/client.rs b/cmd/src/client.rs index ef81b94..cf66fb0 100644 --- a/cmd/src/client.rs +++ b/cmd/src/client.rs @@ -97,7 +97,9 @@ where let buf1 = CopyBuffer::new(buf1.into_boxed_slice()); let buf2 = CopyBuffer::new(buf2.into_boxed_slice()); - bidi_copy_buf(&mut local, &mut remote, buf1, buf2).await.map(|_| ()) + bidi_copy_buf(&mut local, &mut remote, buf1, buf2) + .await + .map(|_| ()) } async fn relay_uot( @@ -119,5 +121,7 @@ where let buf1 = CopyBuffer::new(buf1.into_boxed_slice()); let buf2 = CopyBuffer::new(buf2.into_boxed_slice()); - bidi_copy_buf(&mut local, &mut remote, buf1, buf2).await + bidi_copy_buf(&mut local, &mut remote, buf1, buf2) + .await + .map(|_| ()) } diff --git a/cmd/src/server.rs b/cmd/src/server.rs index 19eed14..b1245f8 100644 --- a/cmd/src/server.rs +++ b/cmd/src/server.rs @@ -96,7 +96,9 @@ where let buf1 = CopyBuffer::new(buf1); let buf2 = CopyBuffer::new(buf2); - bidi_copy_buf(&mut local, &mut remote, buf1, buf2).await + bidi_copy_buf(&mut local, &mut remote, buf1, buf2) + .await + .map(|_| ()) } Streamer::UdpStream(remote) => { println!("{} -> {remote}", local.peer_addr()?); @@ -111,7 +113,9 @@ where let buf1 = CopyBuffer::new(buf1.into_boxed_slice()); let buf2 = CopyBuffer::new(buf2.into_boxed_slice()); - bidi_copy_buf(&mut local, &mut remote, buf1, buf2).await.map(|_| ()) + bidi_copy_buf(&mut local, &mut remote, buf1, buf2) + .await + .map(|_| ()) } } } diff --git a/kaminari/Cargo.toml b/kaminari/Cargo.toml index 320f2bd..0c558d7 100644 --- a/kaminari/Cargo.toml +++ b/kaminari/Cargo.toml @@ -31,7 +31,7 @@ lightws = { version = "0.6", features = [ ], optional = true } # uot -udpflow = { version = "0.1.0", optional = true } +udpflow = { version = "0.1", optional = true } # tls tokio-rustls = { version = "0.24", features = [