Skip to content

Commit

Permalink
Merge pull request #7 from algesten/close
Browse files Browse the repository at this point in the history
close util::Conn connection on .close()
  • Loading branch information
Rain authored Aug 16, 2021
2 parents efba9c9 + 7227a2d commit 744ac18
Show file tree
Hide file tree
Showing 7 changed files with 68 additions and 25 deletions.
4 changes: 2 additions & 2 deletions crates/turn/examples/turn_server_udp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ async fn main() -> Result<()> {
let conn = Arc::new(UdpSocket::bind(format!("0.0.0.0:{}", port)).await?);
println!("listening {}...", conn.local_addr()?);

let server = Server::new(ServerConfig {
let mut server = Server::new(ServerConfig {
conn_configs: vec![ConnConfig {
conn,
relay_addr_generator: Box::new(RelayAddressGeneratorStatic {
Expand All @@ -127,7 +127,7 @@ async fn main() -> Result<()> {
println!("Waiting for Ctrl-C...");
signal::ctrl_c().await.expect("failed to listen for event");
println!("\nClosing connection now...");
server.close()?;
server.close().await?;

Ok(())
}
Original file line number Diff line number Diff line change
Expand Up @@ -237,17 +237,30 @@ async fn test_allocation_timeout() -> Result<()> {
allocations.push(a);
}

tokio::time::sleep(lifetime + Duration::from_millis(100)).await;
let mut count = 0;

for allocation in allocations {
let mut a = allocation.lock().await;
assert!(
a.close().await.is_err(),
"Allocation should be closed if lifetime timeout"
);
}
'outer: loop {
count += 1;

Ok(())
if count >= 10 {
assert!(false, "Allocations didn't timeout");
}

tokio::time::sleep(lifetime + Duration::from_millis(100)).await;

let any_outstanding = false;

for allocation in &allocations {
let mut a = allocation.lock().await;
if !a.close().await.is_err() {
continue 'outer;
}
}

if !any_outstanding {
return Ok(());
}
}
}

#[tokio::test]
Expand Down
3 changes: 3 additions & 0 deletions crates/turn/src/allocation/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,9 @@ impl Allocation {

log::trace!("allocation with {} closed!", self.five_tuple);

let _ = self.turn_socket.close().await;
let _ = self.relay_socket.close().await;

Ok(())
}

Expand Down
4 changes: 2 additions & 2 deletions crates/turn/src/auth/auth_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ async fn test_new_long_term_auth_handler() -> Result<()> {
let conn = Arc::new(UdpSocket::bind("0.0.0.0:0").await?);
let server_port = conn.local_addr()?.port();

let server = Server::new(ServerConfig {
let mut server = Server::new(ServerConfig {
conn_configs: vec![ConnConfig {
conn,
relay_addr_generator: Box::new(RelayAddressGeneratorStatic {
Expand Down Expand Up @@ -97,7 +97,7 @@ async fn test_new_long_term_auth_handler() -> Result<()> {
let _allocation = client.allocate().await?;

client.close().await?;
server.close()?;
server.close().await?;

Ok(())
}
4 changes: 2 additions & 2 deletions crates/turn/src/client/client_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ async fn test_client_nonce_expiration() -> Result<()> {
let conn = Arc::new(UdpSocket::bind("0.0.0.0:0").await?);
let server_port = conn.local_addr()?.port();

let server = Server::new(ServerConfig {
let mut server = Server::new(ServerConfig {
conn_configs: vec![ConnConfig {
conn,
relay_addr_generator: Box::new(RelayAddressGeneratorStatic {
Expand Down Expand Up @@ -183,7 +183,7 @@ async fn test_client_nonce_expiration() -> Result<()> {

// Shutdown
client.close().await?;
server.close()?;
server.close().await?;

Ok(())
}
43 changes: 35 additions & 8 deletions crates/turn/src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use request::*;
use anyhow::Result;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::Mutex;
use tokio::sync::{watch, Mutex};
use tokio::time::{Duration, Instant};
use util::Conn;

Expand All @@ -25,18 +25,22 @@ pub struct Server {
realm: String,
channel_bind_timeout: Duration,
pub(crate) nonces: Arc<Mutex<HashMap<String, Instant>>>,
shutdown_tx: Option<watch::Sender<bool>>,
}

impl Server {
// creates the TURN server
pub async fn new(config: ServerConfig) -> Result<Self> {
config.validate()?;

let (shutdown_tx, shutdown_rx) = watch::channel(false);

let mut s = Server {
auth_handler: config.auth_handler,
realm: config.realm,
channel_bind_timeout: config.channel_bind_timeout,
nonces: Arc::new(Mutex::new(HashMap::new())),
shutdown_tx: Some(shutdown_tx),
};

if s.channel_bind_timeout == Duration::from_secs(0) {
Expand All @@ -48,6 +52,7 @@ impl Server {
let auth_handler = Arc::clone(&s.auth_handler);
let realm = s.realm.clone();
let channel_bind_timeout = s.channel_bind_timeout;
let shutdown_rx = shutdown_rx.clone();

tokio::spawn(async move {
let allocation_manager = Arc::new(Manager::new(ManagerConfig {
Expand All @@ -61,6 +66,7 @@ impl Server {
auth_handler,
realm,
channel_bind_timeout,
shutdown_rx,
)
.await;
});
Expand All @@ -76,16 +82,29 @@ impl Server {
auth_handler: Arc<Box<dyn AuthHandler + Send + Sync>>,
realm: String,
channel_bind_timeout: Duration,
mut shutdown_rx: watch::Receiver<bool>,
) {
let mut buf = vec![0u8; INBOUND_MTU];

loop {
//TODO: gracefully exit loop
let (n, addr) = match conn.recv_from(&mut buf).await {
Ok((n, addr)) => (n, addr),
Err(err) => {
log::debug!("exit read loop on error: {}", err);
break;
let (n, addr) = tokio::select! {
v = conn.recv_from(&mut buf) => {
match v {
Ok(v) => v,
Err(err) => {
log::debug!("exit read loop on error: {}", err);
break;
}
}
},
did_change = shutdown_rx.changed() => {
if did_change.is_err() || *shutdown_rx.borrow() {
// if did_change.is_err, sender was dropped, or if
// bool is set to true, that means we're shutting down.
break
} else {
continue;
}
}
};

Expand All @@ -106,10 +125,18 @@ impl Server {
}

let _ = allocation_manager.close().await;
let _ = conn.close().await;
}

// Close stops the TURN Server. It cleans up any associated state and closes all connections it is managing
pub fn close(&self) -> Result<()> {
pub async fn close(&mut self) -> Result<()> {
if let Some(tx) = self.shutdown_tx.take() {
// errors if there are no receivers, but that's irrelevant.
let _ = tx.send(true);
// wait for all receivers to drop/close.
let _ = tx.closed().await;
}

Ok(())
}
}
4 changes: 2 additions & 2 deletions crates/turn/src/server/server_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ async fn test_server_simple() -> Result<()> {
let conn = Arc::new(UdpSocket::bind("0.0.0.0:0").await?);
let server_port = conn.local_addr()?.port();

let server = Server::new(ServerConfig {
let mut server = Server::new(ServerConfig {
conn_configs: vec![ConnConfig {
conn,
relay_addr_generator: Box::new(RelayAddressGeneratorStatic {
Expand Down Expand Up @@ -88,7 +88,7 @@ async fn test_server_simple() -> Result<()> {
.await?;

client.close().await?;
server.close()?;
server.close().await?;

Ok(())
}
Expand Down

0 comments on commit 744ac18

Please sign in to comment.