Skip to content

Commit

Permalink
NACK responder: bypass auxiliary SSRCs
Browse files Browse the repository at this point in the history
  • Loading branch information
aalekseevx committed Jan 31, 2025
1 parent e187410 commit ad324ac
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 3 deletions.
5 changes: 5 additions & 0 deletions pkg/nack/responder_interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,11 @@ func (n *ResponderInterceptor) BindLocalStream(info *interceptor.StreamInfo, wri
n.streamsMu.Unlock()

return interceptor.RTPWriterFunc(func(header *rtp.Header, payload []byte, attributes interceptor.Attributes) (int, error) {
// If this packet doesn't belong to the main SSRC, do not add it to rtpBuffer
if header.SSRC != info.SSRC {
return writer.Write(header, payload, attributes)
}

pkt, err := n.packetFactory.NewPacket(header, payload, info.SSRCRetransmission, info.PayloadTypeRetransmission)
if err != nil {
return 0, err
Expand Down
78 changes: 75 additions & 3 deletions pkg/nack/responder_interceptor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func TestResponderInterceptor(t *testing.T) {
}()

for _, seqNum := range []uint16{10, 11, 12, 14, 15} {
require.NoError(t, stream.WriteRTP(&rtp.Packet{Header: rtp.Header{SequenceNumber: seqNum}}))
require.NoError(t, stream.WriteRTP(&rtp.Packet{Header: rtp.Header{SequenceNumber: seqNum, SSRC: 1}}))

select {
case p := <-stream.WrittenRTP():
Expand Down Expand Up @@ -252,7 +252,7 @@ func TestResponderInterceptor_RFC4588(t *testing.T) {
}()

for _, seqNum := range []uint16{10, 11, 12, 14, 15} {
require.NoError(t, stream.WriteRTP(&rtp.Packet{Header: rtp.Header{SequenceNumber: seqNum}}))
require.NoError(t, stream.WriteRTP(&rtp.Packet{Header: rtp.Header{SequenceNumber: seqNum, SSRC: 1}}))

select {
case p := <-stream.WrittenRTP():
Expand All @@ -272,7 +272,7 @@ func TestResponderInterceptor_RFC4588(t *testing.T) {
},
})

// seq number 13 was never sent, so it can't be resent
// seq number 13 was never sent, so it can't be present
for _, seqNum := range []uint16{11, 12, 15} {
select {
case p := <-stream.WrittenRTP():
Expand All @@ -290,3 +290,75 @@ func TestResponderInterceptor_RFC4588(t *testing.T) {
case <-time.After(10 * time.Millisecond):
}
}

func TestResponderInterceptor_BypassUnknownSSRCs(t *testing.T) {
f, err := NewResponderInterceptor(
ResponderSize(8),
ResponderLog(logging.NewDefaultLoggerFactory().NewLogger("test")),
)
require.NoError(t, err)

i, err := f.NewInterceptor("")
require.NoError(t, err)

stream := test.NewMockStream(&interceptor.StreamInfo{
SSRC: 1,
RTCPFeedback: []interceptor.RTCPFeedback{{Type: "nack"}},
}, i)
defer func() {
require.NoError(t, stream.Close())
}()

// Send some packets with both SSRCs to check that only SSRC=1 added to the buffer
for _, seqNum := range []uint16{10, 11, 12, 14, 15} {
require.NoError(t, stream.WriteRTP(&rtp.Packet{Header: rtp.Header{SequenceNumber: seqNum, SSRC: 1}}))
// This packet should be bypassed and not added to the buffer.
require.NoError(t, stream.WriteRTP(&rtp.Packet{Header: rtp.Header{SequenceNumber: seqNum, SSRC: 2}}))

select {
case p := <-stream.WrittenRTP():
require.Equal(t, seqNum, p.SequenceNumber)
require.Equal(t, uint32(1), p.SSRC)
case <-time.After(10 * time.Millisecond):
t.Fatal("written rtp packet not found")
}

select {
case p := <-stream.WrittenRTP():
require.Equal(t, seqNum, p.SequenceNumber)
require.Equal(t, uint32(2), p.SSRC)
case <-time.After(10 * time.Millisecond):
t.Fatal("written rtp packet not found")
}
}

// This packet should be bypassed and not added to the buffer.
require.NoError(t, stream.WriteRTP(&rtp.Packet{Header: rtp.Header{SequenceNumber: 13, SSRC: 2}}))
select {
case p := <-stream.WrittenRTP():
require.Equal(t, uint16(13), p.SequenceNumber)
case <-time.After(10 * time.Millisecond):
t.Fatal("written rtp packet not found")
}

stream.ReceiveRTCP([]rtcp.Packet{
&rtcp.TransportLayerNack{
MediaSSRC: 1,
SenderSSRC: 1,
Nacks: []rtcp.NackPair{
{PacketID: 11, LostPackets: 0b1011}, // sequence numbers: 11, 12, 13, 15
},
},
})

// seq number 13 was sent with different ssrc, it should not be present
for _, seqNum := range []uint16{11, 12, 15} {
select {
case p := <-stream.WrittenRTP():
require.Equal(t, uint32(1), p.SSRC)
require.Equal(t, seqNum, p.SequenceNumber)
case <-time.After(10 * time.Millisecond):
t.Fatal("written rtp packet not found")
}
}
}

0 comments on commit ad324ac

Please sign in to comment.