Skip to content

Commit

Permalink
perf: replace Mutex with RwLock
Browse files Browse the repository at this point in the history
  • Loading branch information
Itsusinn committed Mar 23, 2024
1 parent e2beca8 commit 027dc87
Show file tree
Hide file tree
Showing 11 changed files with 54 additions and 46 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
[workspace]
members = ["tuic", "tuic-quinn", "tuic-server", "tuic-client"]
resolver = "2"

[profile.release]
opt-level = 3
Expand Down
1 change: 0 additions & 1 deletion tuic-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ humantime = { version = "2", default-features = false }
lexopt = { version = "0.3", default-features = false }
log = { version = "0.4", default-features = false, features = ["serde", "std"] }
once_cell = { version = "1", default-features = false, features = ["parking_lot", "std"] }
parking_lot = { version = "0.12", default-features = false, features = ["send_guard"] }
quinn = { version = "0.10", default-features = false, features = ["futures-io", "runtime-tokio", "tls-rustls"] }
register-count = { version = "0.1", default-features = false, features = ["std"] }
rustls = { version = "0.21", default-features = false, features = ["quic"] }
Expand Down
3 changes: 2 additions & 1 deletion tuic-client/src/connection/handle_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,8 @@ impl Connection {
let session = SOCKS5_UDP_SESSIONS
.get()
.unwrap()
.lock()
.read()
.await
.get(&assoc_id)
.cloned();

Expand Down
25 changes: 12 additions & 13 deletions tuic-client/src/connection/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use crate::{
};
use crossbeam_utils::atomic::AtomicCell;
use once_cell::sync::OnceCell;
use parking_lot::Mutex;
use quinn::{
congestion::{BbrConfig, CubicConfig, NewRenoConfig},
ClientConfig, Connection as QuinnConnection, Endpoint as QuinnEndpoint, EndpointConfig,
Expand All @@ -18,18 +17,17 @@ use std::{
sync::{atomic::AtomicU32, Arc},
time::Duration,
};
use tokio::{
sync::{Mutex as AsyncMutex, OnceCell as AsyncOnceCell},
time,
};
use tokio::sync::RwLock as AsyncRwLock;
use tokio::{sync::OnceCell as AsyncOnceCell, time};

use tuic_quinn::{side, Connection as Model};
use uuid::Uuid;

mod handle_stream;
mod handle_task;

static ENDPOINT: OnceCell<Mutex<Endpoint>> = OnceCell::new();
static CONNECTION: AsyncOnceCell<AsyncMutex<Connection>> = AsyncOnceCell::const_new();
static ENDPOINT: OnceCell<AsyncRwLock<Endpoint>> = OnceCell::new();
static CONNECTION: AsyncOnceCell<AsyncRwLock<Connection>> = AsyncOnceCell::const_new();
static TIMEOUT: AtomicCell<Duration> = AtomicCell::new(Duration::from_secs(0));

pub const ERROR_CODE: VarInt = VarInt::from_u32(0);
Expand Down Expand Up @@ -117,7 +115,7 @@ impl Connection {
};

ENDPOINT
.set(Mutex::new(ep))
.set(AsyncRwLock::new(ep))
.map_err(|_| "endpoint already initialized")
.unwrap();

Expand All @@ -131,21 +129,22 @@ impl Connection {
ENDPOINT
.get()
.unwrap()
.lock()
.read()
.await
.connect()
.await
.map(AsyncMutex::new)
.map(AsyncRwLock::new)
};

let try_get_conn = async {
let mut conn = CONNECTION
.get_or_try_init(|| try_init_conn)
.await?
.lock()
.write()
.await;

if conn.is_closed() {
let new_conn = ENDPOINT.get().unwrap().lock().connect().await?;
let new_conn = ENDPOINT.get().unwrap().read().await.connect().await?;
*conn = new_conn;
}

Expand Down Expand Up @@ -254,7 +253,7 @@ struct Endpoint {
}

impl Endpoint {
async fn connect(&mut self) -> Result<Connection, Error> {
async fn connect(&self) -> Result<Connection, Error> {
let mut last_err = None;

for addr in self.server.resolve().await? {
Expand Down
6 changes: 4 additions & 2 deletions tuic-client/src/socks5/handle_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ impl Server {
UDP_SESSIONS
.get()
.unwrap()
.lock()
.write()
.await
.insert(assoc_id, session.clone());

let handle_local_incoming_pkt = async move {
Expand Down Expand Up @@ -95,7 +96,8 @@ impl Server {
UDP_SESSIONS
.get()
.unwrap()
.lock()
.write()
.await
.remove(&assoc_id)
.unwrap();

Expand Down
4 changes: 2 additions & 2 deletions tuic-client/src/socks5/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use crate::{config::Local, error::Error};
use once_cell::sync::OnceCell;
use parking_lot::Mutex;
use socket2::{Domain, Protocol, SockAddr, Socket, Type};
use socks5_server::{
auth::{NoAuth, Password},
Expand All @@ -15,6 +14,7 @@ use std::{
},
};
use tokio::net::TcpListener;
use tokio::sync::RwLock as AsyncRwLock;

mod handle_task;
mod udp_session;
Expand Down Expand Up @@ -44,7 +44,7 @@ impl Server {
.unwrap();

UDP_SESSIONS
.set(Mutex::new(HashMap::new()))
.set(AsyncRwLock::new(HashMap::new()))
.map_err(|_| "failed initializing socks5 UDP session pool")
.unwrap();

Expand Down
4 changes: 2 additions & 2 deletions tuic-client/src/socks5/udp_session.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use crate::error::Error;
use bytes::Bytes;
use once_cell::sync::OnceCell;
use parking_lot::Mutex;
use socket2::{Domain, Protocol, SockAddr, Socket, Type};
use socks5_proto::Address;
use socks5_server::AssociatedUdpSocket;
Expand All @@ -12,8 +11,9 @@ use std::{
sync::Arc,
};
use tokio::net::UdpSocket;
use tokio::sync::RwLock as AsyncRwLock;

pub static UDP_SESSIONS: OnceCell<Mutex<HashMap<u16, UdpSession>>> = OnceCell::new();
pub static UDP_SESSIONS: OnceCell<AsyncRwLock<HashMap<u16, UdpSession>>> = OnceCell::new();

#[derive(Clone)]
pub struct UdpSession {
Expand Down
2 changes: 1 addition & 1 deletion tuic-server/src/connection/authenticated.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use crossbeam_utils::atomic::AtomicCell;
use std::{
fmt::{Display, Formatter, Result as FmtResult},
ops::Deref,
sync::Arc
sync::Arc,
};
use tokio::sync::broadcast::Sender;
use tokio::sync::RwLock as AsyncRwLock;
Expand Down
34 changes: 20 additions & 14 deletions tuic-server/src/connection/handle_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,18 +119,24 @@ impl Connection {
src_addr = addr,
);

let session = match self.udp_sessions.lock().entry(assoc_id) {
Entry::Occupied(entry) => entry.get().clone(),
Entry::Vacant(entry) => {
let session = UdpSession::new(
self.clone(),
assoc_id,
self.udp_relay_ipv6,
self.max_external_pkt_size,
)?;
entry.insert(session.clone());
session
}
let guard = self.udp_sessions.read().await;
let session = guard.get(&assoc_id).map(|v| v.to_owned());
drop(guard);
let session = match session {
Some(v) => v,
None => match self.udp_sessions.write().await.entry(assoc_id) {
Entry::Occupied(entry) => entry.get().clone(),
Entry::Vacant(entry) => {
let session = UdpSession::new(
self.clone(),
assoc_id,
self.udp_relay_ipv6,
self.max_external_pkt_size,
)?;
entry.insert(session.clone());
session
}
},
};

let Some(socket_addr) = resolve_dns(&addr).await?.next() else {
Expand Down Expand Up @@ -162,8 +168,8 @@ impl Connection {
user = self.auth,
);

if let Some(session) = self.udp_sessions.lock().remove(&assoc_id) {
session.close();
if let Some(session) = self.udp_sessions.write().await.remove(&assoc_id) {
session.close().await;
}
}

Expand Down
10 changes: 5 additions & 5 deletions tuic-server/src/connection/mod.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
use self::{authenticated::Authenticated, udp_session::UdpSession};
use crate::{error::Error, utils::UdpRelayMode};
use crossbeam_utils::atomic::AtomicCell;
use parking_lot::Mutex;
use quinn::{Connecting, Connection as QuinnConnection, VarInt};
use register_count::Counter;
use std::{
collections::HashMap,
sync::{atomic::AtomicU32, Arc},
time::Duration,
};
use tokio::sync::RwLock as AsyncRwLock;
use tokio::time;
use tuic_quinn::{side, Authenticate, Connection as Model};
use uuid::Uuid;
Expand All @@ -29,7 +29,7 @@ pub struct Connection {
udp_relay_ipv6: bool,
auth: Authenticated,
task_negotiation_timeout: Duration,
udp_sessions: Arc<Mutex<HashMap<u16, UdpSession>>>,
udp_sessions: Arc<AsyncRwLock<HashMap<u16, UdpSession>>>,
udp_relay_mode: Arc<AtomicCell<Option<UdpRelayMode>>>,
max_external_pkt_size: usize,
remote_uni_stream_cnt: Counter,
Expand Down Expand Up @@ -147,7 +147,7 @@ impl Connection {
udp_relay_ipv6,
auth: Authenticated::new(),
task_negotiation_timeout,
udp_sessions: Arc::new(Mutex::new(HashMap::new())),
udp_sessions: Arc::new(AsyncRwLock::new(HashMap::new())),
udp_relay_mode: Arc::new(AtomicCell::new(None)),
max_external_pkt_size,
remote_uni_stream_cnt: Counter::new(),
Expand All @@ -157,15 +157,15 @@ impl Connection {
}
}

fn authenticate(&self, auth: &Authenticate) -> Result<(), Error> {
async fn authenticate(&self, auth: &Authenticate) -> Result<(), Error> {
if self.auth.get().is_some() {
Err(Error::DuplicatedAuth)
} else if self
.users
.get(&auth.uuid())
.map_or(false, |password| auth.validate(password))
{
self.auth.set(auth.uuid());
self.auth.set(auth.uuid()).await;
Ok(())
} else {
Err(Error::AuthFailed(auth.uuid()))
Expand Down
10 changes: 5 additions & 5 deletions tuic-server/src/connection/udp_session.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
use super::Connection;
use crate::error::Error;
use bytes::Bytes;
use parking_lot::Mutex;
use socket2::{Domain, Protocol, SockAddr, Socket, Type};
use std::{
io::Error as IoError,
net::{Ipv4Addr, Ipv6Addr, SocketAddr, UdpSocket as StdUdpSocket},
sync::Arc,
};
use tokio::sync::RwLock as AsyncRwLock;
use tokio::{
net::UdpSocket,
sync::oneshot::{self, Sender},
Expand All @@ -23,7 +23,7 @@ struct UdpSessionInner {
socket_v4: UdpSocket,
socket_v6: Option<UdpSocket>,
max_pkt_size: usize,
close: Mutex<Option<Sender<()>>>,
close: AsyncRwLock<Option<Sender<()>>>,
}

impl UdpSession {
Expand Down Expand Up @@ -89,7 +89,7 @@ impl UdpSession {
socket_v4,
socket_v6,
max_pkt_size,
close: Mutex::new(Some(tx)),
close: AsyncRwLock::new(Some(tx)),
}));

let session_listening = session.clone();
Expand Down Expand Up @@ -161,7 +161,7 @@ impl UdpSession {
}
}

pub fn close(&self) {
let _ = self.0.close.lock().take().unwrap().send(());
pub async fn close(&self) {
let _ = self.0.close.write().await.take().unwrap().send(());
}
}

0 comments on commit 027dc87

Please sign in to comment.