Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[TURN] Fix server relay UDP port not release problem. #330

Merged
merged 9 commits into from
Nov 22, 2022
2 changes: 2 additions & 0 deletions turn/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

## Unreleased

* [#330 Fix the problem that the UDP port of the server relay is not released](https://github.com/webrtc-rs/webrtc/pull/330) by [@clia](https://github.com/clia).

## v0.6.1

* Added `delete_allocations_by_username` method on `Server`. This method provides possibility to manually delete allocation [#263](https://github.com/webrtc-rs/webrtc/pull/263) by [@logist322](https://github.com/logist322).
Expand Down
34 changes: 26 additions & 8 deletions turn/src/allocation/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,11 @@ use std::{
sync::{atomic::AtomicBool, atomic::Ordering, Arc},
};
use tokio::{
sync::{mpsc, Mutex},
sync::{
mpsc,
oneshot::{self, Sender},
Mutex,
},
time::{Duration, Instant},
};

Expand Down Expand Up @@ -78,6 +82,7 @@ pub struct Allocation {
timer_expired: Arc<AtomicBool>,
closed: AtomicBool, // Option<mpsc::Receiver<()>>,
pub(crate) relayed_bytes: AtomicUsize,
drop_tx: Option<Sender<u32>>,
}

fn addr2ipfingerprint(addr: &SocketAddr) -> String {
Expand Down Expand Up @@ -107,6 +112,7 @@ impl Allocation {
timer_expired: Arc::new(AtomicBool::new(false)),
closed: AtomicBool::new(false),
relayed_bytes: Default::default(),
drop_tx: None,
}
}

Expand Down Expand Up @@ -313,26 +319,38 @@ impl Allocation {
// datagram, and the XOR-PEER-ADDRESS attribute is set to the source
// transport address of the received UDP datagram. The Data indication
// is then sent on the 5-tuple associated with the allocation.
async fn packet_handler(&self) {
async fn packet_handler(&mut self) {
let five_tuple = self.five_tuple;
let relay_addr = self.relay_addr;
let relay_socket = Arc::clone(&self.relay_socket);
let turn_socket = Arc::clone(&self.turn_socket);
let allocations = self.allocations.clone();
let channel_bindings = Arc::clone(&self.channel_bindings);
let permissions = Arc::clone(&self.permissions);
let (drop_tx, drop_rx) = oneshot::channel::<u32>();
self.drop_tx = Some(drop_tx);

tokio::spawn(async move {
let mut buffer = vec![0u8; RTP_MTU];

tokio::pin!(drop_rx);

loop {
let (n, src_addr) = match relay_socket.recv_from(&mut buffer).await {
Ok((n, src_addr)) => (n, src_addr),
Err(_) => {
if let Some(allocs) = &allocations {
let mut alls = allocs.lock().await;
alls.remove(&five_tuple);
let (n, src_addr) = tokio::select! {
result = relay_socket.recv_from(&mut buffer) => {
match result {
Ok((n, src_addr)) => (n, src_addr),
Err(_) => {
if let Some(allocs) = &allocations {
let mut alls = allocs.lock().await;
alls.remove(&five_tuple);
}
break;
}
}
}
_ = drop_rx.as_mut() => {
log::trace!("allocation has stopped, stop packet_handler. five_tuple: {:?}", five_tuple);
break;
}
};
Expand Down