diff --git a/crates/interceptor/src/nack/responder/mod.rs b/crates/interceptor/src/nack/responder/mod.rs index ed0a75c07..48bbfbf85 100644 --- a/crates/interceptor/src/nack/responder/mod.rs +++ b/crates/interceptor/src/nack/responder/mod.rs @@ -8,7 +8,7 @@ use crate::{ }; use responder_stream::ResponderStream; -use crate::error::{Error, Result}; +use crate::error::Result; use crate::nack::stream_support_nack; use async_trait::async_trait; @@ -44,7 +44,6 @@ impl InterceptorBuilder for ResponderBuilder { 13 // 8192 = 1 << 13 }, streams: Arc::new(Mutex::new(HashMap::new())), - parent_rtcp_reader: Mutex::new(None), }), })) } @@ -53,7 +52,6 @@ impl InterceptorBuilder for ResponderBuilder { pub struct ResponderInternal { log2_size: u8, streams: Arc>>>, - parent_rtcp_reader: Mutex>>, } impl ResponderInternal { @@ -92,27 +90,22 @@ impl ResponderInternal { } } +pub struct ResponderRtcpReader { + parent_rtcp_reader: Arc, + internal: Arc, +} + #[async_trait] -impl RTCPReader for ResponderInternal { +impl RTCPReader for ResponderRtcpReader { async fn read(&self, buf: &mut [u8], a: &Attributes) -> Result<(usize, Attributes)> { - let (n, attr) = { - let parent_rtcp_reader = { - let parent_rtcp_reader = self.parent_rtcp_reader.lock().await; - parent_rtcp_reader.clone() - }; - if let Some(reader) = parent_rtcp_reader { - reader.read(buf, a).await? - } else { - return Err(Error::ErrInvalidParentRtcpReader); - } - }; + let (n, attr) = { self.parent_rtcp_reader.read(buf, a).await? }; let mut b = &buf[..n]; let pkts = rtcp::packet::unmarshal(&mut b)?; for p in &pkts { if let Some(nack) = p.as_any().downcast_ref::() { let nack = nack.clone(); - let streams = Arc::clone(&self.streams); + let streams = Arc::clone(&self.internal.streams); tokio::spawn(async move { ResponderInternal::resend_packets(streams, nack).await; }); @@ -143,12 +136,10 @@ impl Interceptor for Responder { &self, reader: Arc, ) -> Arc { - { - let mut parent_rtcp_reader = self.internal.parent_rtcp_reader.lock().await; - *parent_rtcp_reader = Some(reader); - } - - Arc::clone(&self.internal) as Arc + Arc::new(ResponderRtcpReader { + internal: Arc::clone(&self.internal), + parent_rtcp_reader: reader, + }) as Arc } /// bind_rtcp_writer lets you modify any outgoing RTCP packets. It is called once per PeerConnection. The returned method diff --git a/crates/interceptor/src/report/mod.rs b/crates/interceptor/src/report/mod.rs index ff440f0fc..4690639e0 100644 --- a/crates/interceptor/src/report/mod.rs +++ b/crates/interceptor/src/report/mod.rs @@ -44,7 +44,6 @@ impl ReportBuilder { Duration::from_secs(1) }, now: self.now.clone(), - parent_rtcp_reader: Mutex::new(None), streams: Mutex::new(HashMap::new()), close_rx: Mutex::new(Some(close_rx)), }), diff --git a/crates/interceptor/src/report/receiver/mod.rs b/crates/interceptor/src/report/receiver/mod.rs index 902a04f13..ed20b5f41 100644 --- a/crates/interceptor/src/report/receiver/mod.rs +++ b/crates/interceptor/src/report/receiver/mod.rs @@ -15,30 +15,24 @@ use waitgroup::WaitGroup; pub(crate) struct ReceiverReportInternal { pub(crate) interval: Duration, pub(crate) now: Option, - pub(crate) parent_rtcp_reader: Mutex>>, pub(crate) streams: Mutex>>, pub(crate) close_rx: Mutex>>, } +pub(crate) struct ReceiverReportRtcpReader { + pub(crate) internal: Arc, + pub(crate) parent_rtcp_reader: Arc, +} + #[async_trait] -impl RTCPReader for ReceiverReportInternal { +impl RTCPReader for ReceiverReportRtcpReader { async fn read(&self, buf: &mut [u8], a: &Attributes) -> Result<(usize, Attributes)> { - let (n, attr) = { - let parent_rtcp_reader = { - let parent_rtcp_reader = self.parent_rtcp_reader.lock().await; - parent_rtcp_reader.clone() - }; - if let Some(reader) = parent_rtcp_reader { - reader.read(buf, a).await? - } else { - return Err(Error::ErrInvalidParentRtcpReader); - } - }; + let (n, attr) = { self.parent_rtcp_reader.read(buf, a).await? }; let mut b = &buf[..n]; let pkts = rtcp::packet::unmarshal(&mut b)?; - let now = if let Some(f) = &self.now { + let now = if let Some(f) = &self.internal.now { f().await } else { SystemTime::now() @@ -50,7 +44,7 @@ impl RTCPReader for ReceiverReportInternal { .downcast_ref::() { let stream = { - let m = self.streams.lock().await; + let m = self.internal.streams.lock().await; m.get(&sr.ssrc).cloned() }; if let Some(stream) = stream { @@ -136,12 +130,10 @@ impl Interceptor for ReceiverReport { &self, reader: Arc, ) -> Arc { - { - let mut parent_rtcp_reader = self.internal.parent_rtcp_reader.lock().await; - *parent_rtcp_reader = Some(reader); - } - - Arc::clone(&self.internal) as Arc + Arc::new(ReceiverReportRtcpReader { + internal: Arc::clone(&self.internal), + parent_rtcp_reader: reader, + }) } /// bind_rtcp_writer lets you modify any outgoing RTCP packets. It is called once per PeerConnection. The returned method