Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add alloc_close_notify config param. #421

Merged
merged 3 commits into from
May 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ice/src/agent/agent_vnet_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,7 @@ pub(crate) async fn add_vnet_stun(wan_net: Arc<net::Net>) -> Result<turn::server
realm: "webrtc.rs".to_owned(),
auth_handler: Arc::new(TestAuthHandler::new()),
channel_bind_timeout: Duration::from_secs(0),
alloc_close_notify: None,
})
.await?;

Expand Down
1 change: 1 addition & 0 deletions ice/src/candidate/candidate_relay_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ async fn test_relay_only_connection() -> Result<(), Error> {
}),
}],
channel_bind_timeout: Duration::from_secs(0),
alloc_close_notify: None,
})
.await?;

Expand Down
1 change: 1 addition & 0 deletions ice/src/candidate/candidate_server_reflexive_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ async fn test_server_reflexive_only_connection() -> Result<()> {
}),
}],
channel_bind_timeout: Duration::from_secs(0),
alloc_close_notify: None,
})
.await?;

Expand Down
1 change: 1 addition & 0 deletions turn/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
## Unreleased

* [#330 Fix the problem that the UDP port of the server relay is not released](https://github.com/webrtc-rs/webrtc/pull/330) by [@clia](https://github.com/clia).
* Added `alloc_close_notify` config parameter to `ServerConfig` and `Allocation`, to receive notify on allocation close event, with metrics data.

## v0.6.1

Expand Down
1 change: 1 addition & 0 deletions turn/examples/turn_server_udp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ async fn main() -> Result<(), Error> {
realm: realm.to_owned(),
auth_handler: Arc::new(MyAuthHandler::new(cred_map)),
channel_bind_timeout: Duration::from_secs(0),
alloc_close_notify: None,
})
.await?;

Expand Down
13 changes: 12 additions & 1 deletion turn/src/allocation/allocation_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,21 @@ use crate::relay::*;
use futures::future;
use std::collections::HashMap;
use stun::textattrs::Username;
use tokio::sync::mpsc;
use util::Conn;

// ManagerConfig a bag of config params for Manager.
pub struct ManagerConfig {
pub relay_addr_generator: Box<dyn RelayAddressGenerator + Send + Sync>,
pub alloc_close_notify: Option<mpsc::Sender<AllocationInfo>>,
}

// Manager is used to hold active allocations
pub struct Manager {
allocations: AllocationMap,
reservations: Arc<Mutex<HashMap<String, u16>>>,
relay_addr_generator: Box<dyn RelayAddressGenerator + Send + Sync>,
alloc_close_notify: Option<mpsc::Sender<AllocationInfo>>,
}

impl Manager {
Expand All @@ -29,6 +32,7 @@ impl Manager {
allocations: Arc::new(Mutex::new(HashMap::new())),
reservations: Arc::new(Mutex::new(HashMap::new())),
relay_addr_generator: config.relay_addr_generator,
alloc_close_notify: config.alloc_close_notify,
}
}

Expand Down Expand Up @@ -95,7 +99,14 @@ impl Manager {
.relay_addr_generator
.allocate_conn(true, requested_port)
.await?;
let mut a = Allocation::new(turn_socket, relay_socket, relay_addr, five_tuple, username);
let mut a = Allocation::new(
turn_socket,
relay_socket,
relay_addr,
five_tuple,
username,
self.alloc_close_notify.clone(),
);
a.allocations = Some(Arc::clone(&self.allocations));

log::debug!("listening on relay addr: {:?}", a.relay_addr);
Expand Down
53 changes: 50 additions & 3 deletions turn/src/allocation/allocation_manager/allocation_manager_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use std::{
};
use stun::{attributes::ATTR_USERNAME, textattrs::TextAttribute};
use tokio::net::UdpSocket;
use tokio::sync::mpsc::Sender;
use util::vnet::net::*;

fn new_test_manager() -> Manager {
Expand All @@ -26,6 +27,7 @@ fn new_test_manager() -> Manager {
address: "0.0.0.0".to_owned(),
net: Arc::new(Net::new(None)),
}),
alloc_close_notify: None,
};
Manager::new(config)
}
Expand Down Expand Up @@ -395,7 +397,9 @@ impl AuthHandler for TestAuthHandler {
}
}

async fn create_server() -> Result<(Server, u16)> {
async fn create_server(
alloc_close_notify: Option<Sender<AllocationInfo>>,
) -> Result<(Server, u16)> {
let conn = Arc::new(UdpSocket::bind("0.0.0.0:0").await?);
let server_port = conn.local_addr()?.port();

Expand All @@ -411,6 +415,7 @@ async fn create_server() -> Result<(Server, u16)> {
realm: "webrtc.rs".to_owned(),
auth_handler: Arc::new(TestAuthHandler {}),
channel_bind_timeout: Duration::from_secs(0),
alloc_close_notify,
})
.await?;

Expand All @@ -437,7 +442,7 @@ async fn create_client(username: String, server_port: u16) -> Result<Client> {
#[cfg(feature = "metrics")]
#[tokio::test]
async fn test_get_allocations_info() -> Result<()> {
let (server, server_port) = create_server().await?;
let (server, server_port) = create_server(None).await?;

let client1 = create_client("user1".to_owned(), server_port).await?;
client1.listen().await?;
Expand Down Expand Up @@ -489,7 +494,7 @@ async fn test_get_allocations_info() -> Result<()> {
#[cfg(feature = "metrics")]
#[tokio::test]
async fn test_get_allocations_info_bytes_count() -> Result<()> {
let (server, server_port) = create_server().await?;
let (server, server_port) = create_server(None).await?;

let client = create_client("foo".to_owned(), server_port).await?;

Expand Down Expand Up @@ -558,3 +563,45 @@ async fn test_get_allocations_info_bytes_count() -> Result<()> {

Ok(())
}

#[cfg(feature = "metrics")]
#[tokio::test]
async fn test_alloc_close_notify() -> Result<()> {
let (tx, mut rx) = mpsc::channel::<AllocationInfo>(1);

tokio::spawn(async move {
if let Some(alloc) = rx.recv().await {
assert_eq!(alloc.relayed_bytes, 50);
}
});

let (server, server_port) = create_server(Some(tx)).await?;

let client = create_client("foo".to_owned(), server_port).await?;

client.listen().await?;

assert!(server.get_allocations_info(None).await?.is_empty());

let conn = client.allocate().await?;
let addr = client
.send_binding_request_to(format!("127.0.0.1:{server_port}").as_str())
.await?;

assert!(!server.get_allocations_info(None).await?.is_empty());

for _ in 0..10 {
conn.send_to(b"Hello", addr).await?;

tokio::time::sleep(Duration::from_millis(100)).await;
}

tokio::time::sleep(Duration::from_millis(1000)).await;

client.close().await?;
server.close().await?;

tokio::time::sleep(Duration::from_millis(1000)).await;

Ok(())
}
9 changes: 9 additions & 0 deletions turn/src/allocation/allocation_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ async fn test_has_permission() -> Result<()> {
relay_addr,
FiveTuple::default(),
TextAttribute::new(ATTR_USERNAME, "user".into()),
None,
);

let addr1 = SocketAddr::from_str("127.0.0.1:3478")?;
Expand Down Expand Up @@ -53,6 +54,7 @@ async fn test_add_permission() -> Result<()> {
relay_addr,
FiveTuple::default(),
TextAttribute::new(ATTR_USERNAME, "user".into()),
None,
);

let addr = SocketAddr::from_str("127.0.0.1:3478")?;
Expand All @@ -76,6 +78,7 @@ async fn test_remove_permission() -> Result<()> {
relay_addr,
FiveTuple::default(),
TextAttribute::new(ATTR_USERNAME, "user".into()),
None,
);

let addr = SocketAddr::from_str("127.0.0.1:3478")?;
Expand Down Expand Up @@ -108,6 +111,7 @@ async fn test_add_channel_bind() -> Result<()> {
relay_addr,
FiveTuple::default(),
TextAttribute::new(ATTR_USERNAME, "user".into()),
None,
);

let addr = SocketAddr::from_str("127.0.0.1:3478")?;
Expand Down Expand Up @@ -141,6 +145,7 @@ async fn test_get_channel_by_number() -> Result<()> {
relay_addr,
FiveTuple::default(),
TextAttribute::new(ATTR_USERNAME, "user".into()),
None,
);

let addr = SocketAddr::from_str("127.0.0.1:3478")?;
Expand Down Expand Up @@ -176,6 +181,7 @@ async fn test_get_channel_by_addr() -> Result<()> {
relay_addr,
FiveTuple::default(),
TextAttribute::new(ATTR_USERNAME, "user".into()),
None,
);

let addr = SocketAddr::from_str("127.0.0.1:3478")?;
Expand Down Expand Up @@ -207,6 +213,7 @@ async fn test_remove_channel_bind() -> Result<()> {
relay_addr,
FiveTuple::default(),
TextAttribute::new(ATTR_USERNAME, "user".into()),
None,
);

let addr = SocketAddr::from_str("127.0.0.1:3478")?;
Expand Down Expand Up @@ -243,6 +250,7 @@ async fn test_allocation_refresh() -> Result<()> {
relay_addr,
FiveTuple::default(),
TextAttribute::new(ATTR_USERNAME, "user".into()),
None,
);

a.start(DEFAULT_LIFETIME).await;
Expand All @@ -264,6 +272,7 @@ async fn test_allocation_close() -> Result<()> {
relay_addr,
FiveTuple::default(),
TextAttribute::new(ATTR_USERNAME, "user".into()),
None,
);

// add mock lifetimeTimer
Expand Down
1 change: 1 addition & 0 deletions turn/src/allocation/channel_bind/channel_bind_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ async fn create_channel_bind(lifetime: Duration) -> Result<Allocation> {
relay_addr,
FiveTuple::default(),
TextAttribute::new(ATTR_USERNAME, "user".into()),
None,
);

let addr = SocketAddr::new(Ipv4Addr::new(0, 0, 0, 0).into(), 0);
Expand Down
14 changes: 14 additions & 0 deletions turn/src/allocation/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ pub struct Allocation {
closed: AtomicBool, // Option<mpsc::Receiver<()>>,
pub(crate) relayed_bytes: AtomicUsize,
drop_tx: Option<Sender<u32>>,
alloc_close_notify: Option<mpsc::Sender<AllocationInfo>>,
}

fn addr2ipfingerprint(addr: &SocketAddr) -> String {
Expand All @@ -97,6 +98,7 @@ impl Allocation {
relay_addr: SocketAddr,
five_tuple: FiveTuple,
username: Username,
alloc_close_notify: Option<mpsc::Sender<AllocationInfo>>,
) -> Self {
Allocation {
protocol: PROTO_UDP,
Expand All @@ -113,6 +115,7 @@ impl Allocation {
closed: AtomicBool::new(false),
relayed_bytes: Default::default(),
drop_tx: None,
alloc_close_notify,
}
}

Expand Down Expand Up @@ -246,6 +249,17 @@ impl Allocation {
let _ = self.turn_socket.close().await;
let _ = self.relay_socket.close().await;

if let Some(notify_tx) = &self.alloc_close_notify {
let _ = notify_tx
.send(AllocationInfo {
five_tuple: self.five_tuple,
username: self.username.text.clone(),
#[cfg(feature = "metrics")]
relayed_bytes: self.relayed_bytes.load(Ordering::Acquire),
})
.await;
}

Ok(())
}

Expand Down
1 change: 1 addition & 0 deletions turn/src/auth/auth_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ async fn test_new_long_term_auth_handler() -> Result<()> {
realm: "webrtc.rs".to_owned(),
auth_handler: Arc::new(LongTermAuthHandler::new(SHARED_SECRET.to_string())),
channel_bind_timeout: Duration::from_secs(0),
alloc_close_notify: None,
})
.await?;

Expand Down
1 change: 1 addition & 0 deletions turn/src/client/client_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ async fn test_client_nonce_expiration() -> Result<()> {
realm: "webrtc.rs".to_owned(),
auth_handler: Arc::new(TestAuthHandler {}),
channel_bind_timeout: Duration::from_secs(0),
alloc_close_notify: None,
})
.await?;

Expand Down
5 changes: 5 additions & 0 deletions turn/src/server/config.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
use crate::allocation::*;
use crate::auth::*;
use crate::error::*;
use crate::relay::*;

use util::Conn;

use std::sync::Arc;
use tokio::sync::mpsc;
use tokio::time::Duration;

// ConnConfig is used for UDP listeners
Expand Down Expand Up @@ -36,6 +38,9 @@ pub struct ServerConfig {

// channel_bind_timeout sets the lifetime of channel binding. Defaults to 10 minutes.
pub channel_bind_timeout: Duration,

// to receive notify on allocation close event, with metrics data.
pub alloc_close_notify: Option<mpsc::Sender<AllocationInfo>>,
}

impl ServerConfig {
Expand Down
1 change: 1 addition & 0 deletions turn/src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ impl Server {
let conn = p.conn;
let allocation_manager = Arc::new(Manager::new(ManagerConfig {
relay_addr_generator: p.relay_addr_generator,
alloc_close_notify: config.alloc_close_notify.clone(),
}));

tokio::spawn(Server::read_loop(
Expand Down
1 change: 1 addition & 0 deletions turn/src/server/request/request_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ async fn test_allocation_lifetime_deletion_zero_lifetime() -> Result<()> {
address: "0.0.0.0".to_owned(),
net: Arc::new(Net::new(None)),
}),
alloc_close_notify: None,
}));

let socket = SocketAddr::new(IpAddr::from_str("127.0.0.1")?, 5000);
Expand Down
2 changes: 2 additions & 0 deletions turn/src/server/server_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ async fn test_server_simple() -> Result<()> {
realm: "webrtc.rs".to_owned(),
auth_handler: Arc::new(TestAuthHandler::new()),
channel_bind_timeout: Duration::from_secs(0),
alloc_close_notify: None,
})
.await?;

Expand Down Expand Up @@ -192,6 +193,7 @@ async fn build_vnet() -> Result<VNet> {
realm: "webrtc.rs".to_owned(),
auth_handler: Arc::new(TestAuthHandler::new()),
channel_bind_timeout: Duration::from_secs(0),
alloc_close_notify: None,
})
.await?;

Expand Down