Skip to content

Commit

Permalink
Wrap the parent_rtcp_reader in a fresh object per call to bind_rtcp_r…
Browse files Browse the repository at this point in the history
…eader to ensure multiple calls to an interceptor don't trample previous state
  • Loading branch information
robashton committed Jan 13, 2022
1 parent 89b0dc7 commit 89409d2
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 44 deletions.
35 changes: 13 additions & 22 deletions crates/interceptor/src/nack/responder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use crate::{
};
use responder_stream::ResponderStream;

use crate::error::{Error, Result};
use crate::error::Result;
use crate::nack::stream_support_nack;

use async_trait::async_trait;
Expand Down Expand Up @@ -44,7 +44,6 @@ impl InterceptorBuilder for ResponderBuilder {
13 // 8192 = 1 << 13
},
streams: Arc::new(Mutex::new(HashMap::new())),
parent_rtcp_reader: Mutex::new(None),
}),
}))
}
Expand All @@ -53,7 +52,6 @@ impl InterceptorBuilder for ResponderBuilder {
pub struct ResponderInternal {
log2_size: u8,
streams: Arc<Mutex<HashMap<u32, Arc<ResponderStream>>>>,
parent_rtcp_reader: Mutex<Option<Arc<dyn RTCPReader + Send + Sync>>>,
}

impl ResponderInternal {
Expand Down Expand Up @@ -92,27 +90,22 @@ impl ResponderInternal {
}
}

pub struct ResponderRtcpReader {
parent_rtcp_reader: Arc<dyn RTCPReader + Send + Sync>,
internal: Arc<ResponderInternal>,
}

#[async_trait]
impl RTCPReader for ResponderInternal {
impl RTCPReader for ResponderRtcpReader {
async fn read(&self, buf: &mut [u8], a: &Attributes) -> Result<(usize, Attributes)> {
let (n, attr) = {
let parent_rtcp_reader = {
let parent_rtcp_reader = self.parent_rtcp_reader.lock().await;
parent_rtcp_reader.clone()
};
if let Some(reader) = parent_rtcp_reader {
reader.read(buf, a).await?
} else {
return Err(Error::ErrInvalidParentRtcpReader);
}
};
let (n, attr) = { self.parent_rtcp_reader.read(buf, a).await? };

let mut b = &buf[..n];
let pkts = rtcp::packet::unmarshal(&mut b)?;
for p in &pkts {
if let Some(nack) = p.as_any().downcast_ref::<TransportLayerNack>() {
let nack = nack.clone();
let streams = Arc::clone(&self.streams);
let streams = Arc::clone(&self.internal.streams);
tokio::spawn(async move {
ResponderInternal::resend_packets(streams, nack).await;
});
Expand Down Expand Up @@ -143,12 +136,10 @@ impl Interceptor for Responder {
&self,
reader: Arc<dyn RTCPReader + Send + Sync>,
) -> Arc<dyn RTCPReader + Send + Sync> {
{
let mut parent_rtcp_reader = self.internal.parent_rtcp_reader.lock().await;
*parent_rtcp_reader = Some(reader);
}

Arc::clone(&self.internal) as Arc<dyn RTCPReader + Send + Sync>
Arc::new(ResponderRtcpReader {
internal: Arc::clone(&self.internal),
parent_rtcp_reader: reader,
}) as Arc<dyn RTCPReader + Send + Sync>
}

/// bind_rtcp_writer lets you modify any outgoing RTCP packets. It is called once per PeerConnection. The returned method
Expand Down
1 change: 0 additions & 1 deletion crates/interceptor/src/report/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ impl ReportBuilder {
Duration::from_secs(1)
},
now: self.now.clone(),
parent_rtcp_reader: Mutex::new(None),
streams: Mutex::new(HashMap::new()),
close_rx: Mutex::new(Some(close_rx)),
}),
Expand Down
34 changes: 13 additions & 21 deletions crates/interceptor/src/report/receiver/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,30 +15,24 @@ use waitgroup::WaitGroup;
pub(crate) struct ReceiverReportInternal {
pub(crate) interval: Duration,
pub(crate) now: Option<FnTimeGen>,
pub(crate) parent_rtcp_reader: Mutex<Option<Arc<dyn RTCPReader + Send + Sync>>>,
pub(crate) streams: Mutex<HashMap<u32, Arc<ReceiverStream>>>,
pub(crate) close_rx: Mutex<Option<mpsc::Receiver<()>>>,
}

pub(crate) struct ReceiverReportRtcpReader {
pub(crate) internal: Arc<ReceiverReportInternal>,
pub(crate) parent_rtcp_reader: Arc<dyn RTCPReader + Send + Sync>,
}

#[async_trait]
impl RTCPReader for ReceiverReportInternal {
impl RTCPReader for ReceiverReportRtcpReader {
async fn read(&self, buf: &mut [u8], a: &Attributes) -> Result<(usize, Attributes)> {
let (n, attr) = {
let parent_rtcp_reader = {
let parent_rtcp_reader = self.parent_rtcp_reader.lock().await;
parent_rtcp_reader.clone()
};
if let Some(reader) = parent_rtcp_reader {
reader.read(buf, a).await?
} else {
return Err(Error::ErrInvalidParentRtcpReader);
}
};
let (n, attr) = { self.parent_rtcp_reader.read(buf, a).await? };

let mut b = &buf[..n];
let pkts = rtcp::packet::unmarshal(&mut b)?;

let now = if let Some(f) = &self.now {
let now = if let Some(f) = &self.internal.now {
f().await
} else {
SystemTime::now()
Expand All @@ -50,7 +44,7 @@ impl RTCPReader for ReceiverReportInternal {
.downcast_ref::<rtcp::sender_report::SenderReport>()
{
let stream = {
let m = self.streams.lock().await;
let m = self.internal.streams.lock().await;
m.get(&sr.ssrc).cloned()
};
if let Some(stream) = stream {
Expand Down Expand Up @@ -136,12 +130,10 @@ impl Interceptor for ReceiverReport {
&self,
reader: Arc<dyn RTCPReader + Send + Sync>,
) -> Arc<dyn RTCPReader + Send + Sync> {
{
let mut parent_rtcp_reader = self.internal.parent_rtcp_reader.lock().await;
*parent_rtcp_reader = Some(reader);
}

Arc::clone(&self.internal) as Arc<dyn RTCPReader + Send + Sync>
Arc::new(ReceiverReportRtcpReader {
internal: Arc::clone(&self.internal),
parent_rtcp_reader: reader,
})
}

/// bind_rtcp_writer lets you modify any outgoing RTCP packets. It is called once per PeerConnection. The returned method
Expand Down

0 comments on commit 89409d2

Please sign in to comment.