diff --git a/go.mod b/go.mod index a1c10b719ec..d54295c5063 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,7 @@ require ( github.com/pion/datachannel v1.5.9 github.com/pion/dtls/v3 v3.0.2 github.com/pion/ice/v4 v4.0.1 - github.com/pion/interceptor v0.1.34 + github.com/pion/interceptor v0.1.36 github.com/pion/logging v0.2.2 github.com/pion/randutil v0.1.0 github.com/pion/rtcp v1.2.14 diff --git a/go.sum b/go.sum index bec6efe011e..8b6e121731a 100644 --- a/go.sum +++ b/go.sum @@ -41,8 +41,8 @@ github.com/pion/dtls/v3 v3.0.2 h1:425DEeJ/jfuTTghhUDW0GtYZYIwwMtnKKJNMcWccTX0= github.com/pion/dtls/v3 v3.0.2/go.mod h1:dfIXcFkKoujDQ+jtd8M6RgqKK3DuaUilm3YatAbGp5k= github.com/pion/ice/v4 v4.0.1 h1:2d3tPoTR90F3TcGYeXUwucGlXI3hds96cwv4kjZmb9s= github.com/pion/ice/v4 v4.0.1/go.mod h1:2dpakjpd7+74L5j3TAe6gvkbI5UIzOgAnkimm9SuHvA= -github.com/pion/interceptor v0.1.34 h1:jb1MG9LTdQ4VVCSZDUbUzjeJNngzz4dBXcr2dL+ejfA= -github.com/pion/interceptor v0.1.34/go.mod h1:JzxbJ4umVTlZAf+/utHzNesY8tmRkM2lVmkS82TTj8Y= +github.com/pion/interceptor v0.1.36 h1:WNOZUs5Vec3+NHeY6uGo4nvbxCcRglrI//DlUwLnl/M= +github.com/pion/interceptor v0.1.36/go.mod h1:JzxbJ4umVTlZAf+/utHzNesY8tmRkM2lVmkS82TTj8Y= github.com/pion/logging v0.2.2 h1:M9+AIj/+pxNsDfAT64+MAVgJO0rsyLnoJKCqf//DoeY= github.com/pion/logging v0.2.2/go.mod h1:k0/tDVsRCX2Mb2ZEmTqNa7CWsQPc+YYCB7Q+5pahoms= github.com/pion/mdns/v2 v2.0.7 h1:c9kM8ewCgjslaAmicYMFQIde2H9/lrZpjBkN8VwoVtM= diff --git a/peerconnection_go_test.go b/peerconnection_go_test.go index 0fe66d72a19..ef967dee965 100644 --- a/peerconnection_go_test.go +++ b/peerconnection_go_test.go @@ -931,7 +931,7 @@ func TestICERestart_Error_Handling(t *testing.T) { report := test.CheckRoutines(t) defer report() - offerPeerConnection, answerPeerConnection, wan := createVNetPair(t) + offerPeerConnection, answerPeerConnection, wan := createVNetPair(t, nil) pushICEState := func(i ICEConnectionState) { iceStates <- i } offerPeerConnection.OnICEConnectionStateChange(pushICEState) diff --git a/peerconnection_media_test.go b/peerconnection_media_test.go index 9887336a1df..74863148a53 100644 --- a/peerconnection_media_test.go +++ b/peerconnection_media_test.go @@ -13,6 +13,7 @@ import ( "errors" "fmt" "io" + "math/rand" "regexp" "strings" "sync" @@ -322,7 +323,7 @@ func TestPeerConnection_Media_Disconnected(t *testing.T) { m := &MediaEngine{} assert.NoError(t, m.RegisterDefaultCodecs()) - pcOffer, pcAnswer, wan := createVNetPair(t) + pcOffer, pcAnswer, wan := createVNetPair(t, nil) keepPackets := &atomicBool{} keepPackets.set(true) @@ -1780,3 +1781,76 @@ func TestPeerConnection_Zero_PayloadType(t *testing.T) { closePairNow(t, pcOffer, pcAnswer) } + +// Assert that NACKs work E2E with no extra configuration. If media is sent over a lossy connection +// the user gets retransmitted RTP packets with no extra configuration +func Test_PeerConnection_RTX_E2E(t *testing.T) { + defer test.TimeOut(time.Second * 30).Stop() + + pcOffer, pcAnswer, wan := createVNetPair(t, nil) + + wan.AddChunkFilter(func(vnet.Chunk) bool { + return rand.Intn(5) != 4 //nolint: gosec + }) + + track, err := NewTrackLocalStaticSample(RTPCodecCapability{MimeType: MimeTypeVP8}, "track-id", "stream-id") + assert.NoError(t, err) + + rtpSender, err := pcOffer.AddTrack(track) + assert.NoError(t, err) + + go func() { + rtcpBuf := make([]byte, 1500) + for { + if _, _, rtcpErr := rtpSender.Read(rtcpBuf); rtcpErr != nil { + return + } + } + }() + + rtxSsrc := rtpSender.GetParameters().Encodings[0].RTX.SSRC + ssrc := rtpSender.GetParameters().Encodings[0].SSRC + + rtxRead, rtxReadCancel := context.WithCancel(context.Background()) + pcAnswer.OnTrack(func(track *TrackRemote, _ *RTPReceiver) { + for { + pkt, attributes, readRTPErr := track.ReadRTP() + if errors.Is(readRTPErr, io.EOF) { + return + } else if pkt.PayloadType == 0 { + continue + } + + assert.NotNil(t, pkt) + assert.Equal(t, pkt.SSRC, uint32(ssrc)) + assert.Equal(t, pkt.PayloadType, uint8(96)) + + rtxPayloadType := attributes.Get(AttributeRtxPayloadType) + rtxSequenceNumber := attributes.Get(AttributeRtxSequenceNumber) + rtxSSRC := attributes.Get(AttributeRtxSsrc) + if rtxPayloadType != nil && rtxSequenceNumber != nil && rtxSSRC != nil { + assert.Equal(t, rtxPayloadType, uint8(97)) + assert.Equal(t, rtxSSRC, uint32(rtxSsrc)) + + rtxReadCancel() + } + } + }) + + assert.NoError(t, signalPair(pcOffer, pcAnswer)) + + func() { + for { + select { + case <-time.After(20 * time.Millisecond): + writeErr := track.WriteSample(media.Sample{Data: []byte{0x00}, Duration: time.Second}) + assert.NoError(t, writeErr) + case <-rtxRead.Done(): + return + } + } + }() + + assert.NoError(t, wan.Stop()) + closePairNow(t, pcOffer, pcAnswer) +} diff --git a/rtpreceiver_go_test.go b/rtpreceiver_go_test.go index 60c3fd8e95b..0d3ef2d4bf0 100644 --- a/rtpreceiver_go_test.go +++ b/rtpreceiver_go_test.go @@ -8,21 +8,16 @@ package webrtc import ( "context" - "encoding/binary" - "errors" - "io" "testing" "time" - "github.com/pion/rtp" "github.com/pion/sdp/v3" - "github.com/pion/transport/v3/test" "github.com/pion/webrtc/v4/pkg/media" "github.com/stretchr/testify/assert" ) func TestSetRTPParameters(t *testing.T) { - sender, receiver, wan := createVNetPair(t) + sender, receiver, wan := createVNetPair(t, nil) outgoingTrack, err := NewTrackLocalStaticSample(RTPCodecCapability{MimeType: MimeTypeVP8}, "video", "pion") assert.NoError(t, err) @@ -75,89 +70,3 @@ func TestSetRTPParameters(t *testing.T) { assert.NoError(t, wan.Stop()) closePairNow(t, sender, receiver) } - -// Assert the behavior of reading a RTX with a distinct SSRC -// All the attributes should be populated and the packet unpacked -func Test_RTX_Read(t *testing.T) { - defer test.TimeOut(time.Second * 30).Stop() - - pcOffer, pcAnswer, err := newPair() - assert.NoError(t, err) - - track, err := NewTrackLocalStaticRTP(RTPCodecCapability{MimeType: MimeTypeVP8}, "track-id", "stream-id") - assert.NoError(t, err) - - rtpSender, err := pcOffer.AddTrack(track) - assert.NoError(t, err) - - rtxSsrc := rtpSender.GetParameters().Encodings[0].RTX.SSRC - ssrc := rtpSender.GetParameters().Encodings[0].SSRC - - rtxRead, rtxReadCancel := context.WithCancel(context.Background()) - pcAnswer.OnTrack(func(track *TrackRemote, _ *RTPReceiver) { - for { - pkt, attributes, readRTPErr := track.ReadRTP() - if errors.Is(readRTPErr, io.EOF) { - return - } else if pkt.PayloadType == 0 { - continue - } - - assert.NoError(t, readRTPErr) - assert.NotNil(t, pkt) - assert.Equal(t, pkt.SSRC, uint32(ssrc)) - assert.Equal(t, pkt.PayloadType, uint8(96)) - assert.Equal(t, pkt.Payload, []byte{0xB, 0xA, 0xD}) - - rtxPayloadType := attributes.Get(AttributeRtxPayloadType) - rtxSequenceNumber := attributes.Get(AttributeRtxSequenceNumber) - rtxSSRC := attributes.Get(AttributeRtxSsrc) - if rtxPayloadType != nil && rtxSequenceNumber != nil && rtxSSRC != nil { - assert.Equal(t, rtxPayloadType, uint8(97)) - assert.Equal(t, rtxSSRC, uint32(rtxSsrc)) - assert.Equal(t, rtxSequenceNumber, pkt.SequenceNumber+500) - - rtxReadCancel() - } - } - }) - - assert.NoError(t, signalPair(pcOffer, pcAnswer)) - - func() { - for i := uint16(0); ; i++ { - pkt := rtp.Packet{ - Header: rtp.Header{ - Version: 2, - SSRC: uint32(ssrc), - PayloadType: 96, - SequenceNumber: i, - }, - Payload: []byte{0xB, 0xA, 0xD}, - } - - select { - case <-time.After(20 * time.Millisecond): - // Send the original packet - err = track.WriteRTP(&pkt) - assert.NoError(t, err) - - rtxPayload := []byte{0x0, 0x0, 0xB, 0xA, 0xD} - binary.BigEndian.PutUint16(rtxPayload[0:2], pkt.Header.SequenceNumber) - - // Send the RTX - _, err = track.bindings[0].writeStream.WriteRTP(&rtp.Header{ - Version: 2, - SSRC: uint32(rtxSsrc), - PayloadType: 97, - SequenceNumber: i + 500, - }, rtxPayload) - assert.NoError(t, err) - case <-rtxRead.Done(): - return - } - } - }() - - closePairNow(t, pcOffer, pcAnswer) -} diff --git a/rtpreceiver_test.go b/rtpreceiver_test.go index 33e47917fb3..40c7900d0ab 100644 --- a/rtpreceiver_test.go +++ b/rtpreceiver_test.go @@ -11,6 +11,7 @@ import ( "testing" "time" + "github.com/pion/interceptor" "github.com/pion/transport/v3/test" "github.com/pion/webrtc/v4/pkg/media" "github.com/stretchr/testify/assert" @@ -25,7 +26,7 @@ func Test_RTPReceiver_SetReadDeadline(t *testing.T) { report := test.CheckRoutines(t) defer report() - sender, receiver, wan := createVNetPair(t) + sender, receiver, wan := createVNetPair(t, &interceptor.Registry{}) track, err := NewTrackLocalStaticSample(RTPCodecCapability{MimeType: MimeTypeVP8}, "video", "pion") assert.NoError(t, err) diff --git a/rtpsender_test.go b/rtpsender_test.go index e938949c931..f0e85a82454 100644 --- a/rtpsender_test.go +++ b/rtpsender_test.go @@ -14,6 +14,7 @@ import ( "testing" "time" + "github.com/pion/interceptor" "github.com/pion/transport/v3/test" "github.com/pion/webrtc/v4/pkg/media" "github.com/stretchr/testify/assert" @@ -157,7 +158,7 @@ func Test_RTPSender_SetReadDeadline(t *testing.T) { report := test.CheckRoutines(t) defer report() - sender, receiver, wan := createVNetPair(t) + sender, receiver, wan := createVNetPair(t, &interceptor.Registry{}) track, err := NewTrackLocalStaticSample(RTPCodecCapability{MimeType: MimeTypeVP8}, "video", "pion") assert.NoError(t, err) diff --git a/vnet_test.go b/vnet_test.go index 3cb89044986..a527ae665ce 100644 --- a/vnet_test.go +++ b/vnet_test.go @@ -16,7 +16,7 @@ import ( "github.com/stretchr/testify/assert" ) -func createVNetPair(t *testing.T) (*PeerConnection, *PeerConnection, *vnet.Router) { +func createVNetPair(t *testing.T, interceptorRegistry *interceptor.Registry) (*PeerConnection, *PeerConnection, *vnet.Router) { // Create a root router wan, err := vnet.NewRouter(&vnet.RouterConfig{ CIDR: "1.2.3.0/24", @@ -53,12 +53,18 @@ func createVNetPair(t *testing.T) (*PeerConnection, *PeerConnection, *vnet.Route // Start the virtual network by calling Start() on the root router assert.NoError(t, wan.Start()) - offerInterceptorRegistry := &interceptor.Registry{} - offerPeerConnection, err := NewAPI(WithSettingEngine(offerSettingEngine), WithInterceptorRegistry(offerInterceptorRegistry)).NewPeerConnection(Configuration{}) + offerOptions := []func(*API){WithSettingEngine(offerSettingEngine)} + if interceptorRegistry != nil { + offerOptions = append(offerOptions, WithInterceptorRegistry(interceptorRegistry)) + } + offerPeerConnection, err := NewAPI(offerOptions...).NewPeerConnection(Configuration{}) assert.NoError(t, err) - answerInterceptorRegistry := &interceptor.Registry{} - answerPeerConnection, err := NewAPI(WithSettingEngine(answerSettingEngine), WithInterceptorRegistry(answerInterceptorRegistry)).NewPeerConnection(Configuration{}) + answerOptions := []func(*API){WithSettingEngine(answerSettingEngine)} + if interceptorRegistry != nil { + answerOptions = append(answerOptions, WithInterceptorRegistry(interceptorRegistry)) + } + answerPeerConnection, err := NewAPI(answerOptions...).NewPeerConnection(Configuration{}) assert.NoError(t, err) return offerPeerConnection, answerPeerConnection, wan