From 8d50647717e6c4009d657f74be48c1ab6ef06ef4 Mon Sep 17 00:00:00 2001 From: Adam Kiss Date: Fri, 27 Nov 2020 17:49:02 +0100 Subject: [PATCH] Add Nack Interceptors Add ResponderInterceptor which responds to NACK Requests Add GeneratorInterceptor which generates NACK Requests --- chain.go | 2 - go.mod | 4 +- go.sum | 6 +- interceptor.go | 2 - nack.go | 14 --- noop.go | 2 - pkg/nack/errors.go | 7 ++ pkg/nack/generator_interceptor.go | 157 +++++++++++++++++++++++++ pkg/nack/generator_interceptor_test.go | 70 +++++++++++ pkg/nack/generator_option.go | 40 +++++++ pkg/nack/receive_log.go | 134 +++++++++++++++++++++ pkg/nack/receive_log_test.go | 134 +++++++++++++++++++++ pkg/nack/responder_interceptor.go | 127 ++++++++++++++++++++ pkg/nack/responder_interceptor_test.go | 71 +++++++++++ pkg/nack/responder_option.go | 21 ++++ pkg/nack/send_buffer.go | 74 ++++++++++++ pkg/nack/send_buffer_test.go | 64 ++++++++++ registry.go | 2 - streaminfo.go | 2 - 19 files changed, 906 insertions(+), 27 deletions(-) delete mode 100644 nack.go create mode 100644 pkg/nack/errors.go create mode 100644 pkg/nack/generator_interceptor.go create mode 100644 pkg/nack/generator_interceptor_test.go create mode 100644 pkg/nack/generator_option.go create mode 100644 pkg/nack/receive_log.go create mode 100644 pkg/nack/receive_log_test.go create mode 100644 pkg/nack/responder_interceptor.go create mode 100644 pkg/nack/responder_interceptor_test.go create mode 100644 pkg/nack/responder_option.go create mode 100644 pkg/nack/send_buffer.go create mode 100644 pkg/nack/send_buffer_test.go diff --git a/chain.go b/chain.go index c926b125..32e27c1c 100644 --- a/chain.go +++ b/chain.go @@ -1,5 +1,3 @@ -// +build !js - package interceptor // Chain is an interceptor that runs all child interceptors in order. diff --git a/go.mod b/go.mod index fa12d1ca..4091812e 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,8 @@ module github.com/pion/interceptor go 1.15 require ( - github.com/pion/rtcp v1.2.4 + github.com/pion/logging v0.2.2 + github.com/pion/rtcp v1.2.6 github.com/pion/rtp v1.6.1 + github.com/stretchr/testify v1.6.1 ) diff --git a/go.sum b/go.sum index a9cabf9d..60ea4354 100644 --- a/go.sum +++ b/go.sum @@ -1,9 +1,11 @@ github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/pion/logging v0.2.2 h1:M9+AIj/+pxNsDfAT64+MAVgJO0rsyLnoJKCqf//DoeY= +github.com/pion/logging v0.2.2/go.mod h1:k0/tDVsRCX2Mb2ZEmTqNa7CWsQPc+YYCB7Q+5pahoms= github.com/pion/randutil v0.1.0 h1:CFG1UdESneORglEsnimhUjf33Rwjubwj6xfiOXBa3mA= github.com/pion/randutil v0.1.0/go.mod h1:XcJrSMMbbMRhASFVOlj/5hQial/Y8oH/HVo7TBZq+j8= -github.com/pion/rtcp v1.2.4 h1:NT3H5LkUGgaEapvp0HGik+a+CpflRF7KTD7H+o7OWIM= -github.com/pion/rtcp v1.2.4/go.mod h1:52rMNPWFsjr39z9B9MhnkqhPLoeHTv1aN63o/42bWE0= +github.com/pion/rtcp v1.2.6 h1:1zvwBbyd0TeEuuWftrd/4d++m+/kZSeiguxU61LFWpo= +github.com/pion/rtcp v1.2.6/go.mod h1:52rMNPWFsjr39z9B9MhnkqhPLoeHTv1aN63o/42bWE0= github.com/pion/rtp v1.6.1 h1:2Y2elcVBrahYnHKN2X7rMHX/r1R4TEBMP1LaVu/wNhk= github.com/pion/rtp v1.6.1/go.mod h1:bDb5n+BFZxXx0Ea7E5qe+klMuqiBrP+w8XSjiWtCUko= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= diff --git a/interceptor.go b/interceptor.go index d168a836..546bbf8a 100644 --- a/interceptor.go +++ b/interceptor.go @@ -1,5 +1,3 @@ -// +build !js - // Package interceptor contains the Interceptor interface, with some useful interceptors that should be safe to use // in most cases. package interceptor diff --git a/nack.go b/nack.go deleted file mode 100644 index 4e6131ad..00000000 --- a/nack.go +++ /dev/null @@ -1,14 +0,0 @@ -// +build !js - -package interceptor - -// NACK interceptor generates/responds to nack messages. -type NACK struct { - NoOp -} - -// BindRemoteStream lets you modify any incoming RTP packets. It is called once for per RemoteStream. The returned method -// will be called once per rtp packet. -func (n *NACK) BindRemoteStream(_ *StreamInfo, reader RTPReader) RTPReader { - return reader -} diff --git a/noop.go b/noop.go index c8c23c26..2dc4e8e2 100644 --- a/noop.go +++ b/noop.go @@ -1,5 +1,3 @@ -// +build !js - package interceptor // NoOp is an Interceptor that does not modify any packets. It can embedded in other interceptors, so it's diff --git a/pkg/nack/errors.go b/pkg/nack/errors.go new file mode 100644 index 00000000..588d3149 --- /dev/null +++ b/pkg/nack/errors.go @@ -0,0 +1,7 @@ +// Package nack provides interceptors to implement sending and receiving negative acknowledgements +package nack + +import "errors" + +// ErrInvalidSize is returned by newReceiveLog/newSendBuffer, when an incorrect buffer size is supplied. +var ErrInvalidSize = errors.New("invalid buffer size") diff --git a/pkg/nack/generator_interceptor.go b/pkg/nack/generator_interceptor.go new file mode 100644 index 00000000..abe528f5 --- /dev/null +++ b/pkg/nack/generator_interceptor.go @@ -0,0 +1,157 @@ +package nack + +import ( + "math/rand" + "sync" + "time" + + "github.com/pion/interceptor" + "github.com/pion/logging" + "github.com/pion/rtcp" + "github.com/pion/rtp" +) + +// GeneratorInterceptor interceptor generates nack feedback messages. +type GeneratorInterceptor struct { + interceptor.NoOp + size uint16 + skipLastN uint16 + interval time.Duration + receiveLogs *sync.Map + m sync.Mutex + wg sync.WaitGroup + close chan struct{} + log logging.LeveledLogger +} + +// NewGeneratorInterceptor returns a new GeneratorInterceptor interceptor +func NewGeneratorInterceptor(opts ...GeneratorOption) (*GeneratorInterceptor, error) { + r := &GeneratorInterceptor{ + NoOp: interceptor.NoOp{}, + size: 8192, + skipLastN: 0, + interval: time.Millisecond * 100, + receiveLogs: &sync.Map{}, + close: make(chan struct{}), + log: logging.NewDefaultLoggerFactory().NewLogger("nack_generator"), + } + + for _, opt := range opts { + opt(r) + } + + if _, err := newReceiveLog(r.size); err != nil { + return nil, err + } + + return r, nil +} + +// BindRTCPWriter lets you modify any outgoing RTCP packets. It is called once per PeerConnection. The returned method +// will be called once per packet batch. +func (n *GeneratorInterceptor) BindRTCPWriter(writer interceptor.RTCPWriter) interceptor.RTCPWriter { + n.m.Lock() + defer n.m.Unlock() + select { + case <-n.close: + // already closed + return writer + default: + } + + n.wg.Add(1) + + go n.loop(writer) + + return writer +} + +// BindRemoteStream lets you modify any incoming RTP packets. It is called once for per RemoteStream. The returned method +// will be called once per rtp packet. +func (n *GeneratorInterceptor) BindRemoteStream(info *interceptor.StreamInfo, reader interceptor.RTPReader) interceptor.RTPReader { + hasNack := false + for _, fb := range info.RTCPFeedback { + if fb.Type == "nack" && fb.Parameter == "" { + hasNack = true + } + } + + if !hasNack { + return reader + } + + // error is already checked in NewGeneratorInterceptor + receiveLog, _ := newReceiveLog(n.size) + n.receiveLogs.Store(info.SSRC, receiveLog) + + return interceptor.RTPReaderFunc(func() (*rtp.Packet, interceptor.Attributes, error) { + p, attr, err := reader.Read() + if err != nil { + return nil, nil, err + } + + receiveLog.add(p.SequenceNumber) + + return p, attr, nil + }) +} + +// UnbindLocalStream is called when the Stream is removed. It can be used to clean up any data related to that track. +func (n *GeneratorInterceptor) UnbindLocalStream(info *interceptor.StreamInfo) { + n.receiveLogs.Delete(info.SSRC) +} + +// Close closes the interceptor +func (n *GeneratorInterceptor) Close() error { + defer n.wg.Wait() + n.m.Lock() + defer n.m.Unlock() + + select { + case <-n.close: + // already closed + return nil + default: + } + + close(n.close) + + return nil +} + +func (n *GeneratorInterceptor) loop(rtcpWriter interceptor.RTCPWriter) { + defer n.wg.Done() + + senderSSRC := rand.Uint32() // #nosec + + ticker := time.NewTicker(n.interval) + for { + select { + case <-ticker.C: + n.receiveLogs.Range(func(key, value interface{}) bool { + ssrc := key.(uint32) + receiveLog := value.(*receiveLog) + + missing := receiveLog.missingSeqNumbers(n.skipLastN) + if len(missing) == 0 { + return true + } + + nack := &rtcp.TransportLayerNack{ + SenderSSRC: senderSSRC, + MediaSSRC: ssrc, + Nacks: rtcp.NackPairsFromSequenceNumbers(missing), + } + + if _, err := rtcpWriter.Write([]rtcp.Packet{nack}, interceptor.Attributes{}); err != nil { + n.log.Warnf("failed sending nack: %+v", err) + } + + return true + }) + + case <-n.close: + return + } + } +} diff --git a/pkg/nack/generator_interceptor_test.go b/pkg/nack/generator_interceptor_test.go new file mode 100644 index 00000000..481a42c6 --- /dev/null +++ b/pkg/nack/generator_interceptor_test.go @@ -0,0 +1,70 @@ +package nack + +import ( + "testing" + "time" + + "github.com/pion/interceptor" + "github.com/pion/interceptor/internal/test" + "github.com/pion/logging" + "github.com/pion/rtcp" + "github.com/pion/rtp" + "github.com/stretchr/testify/assert" +) + +func TestGeneratorInterceptor(t *testing.T) { + const interval = time.Millisecond * 10 + i, err := NewGeneratorInterceptor( + GeneratorSize(64), + GeneratorSkipLastN(2), + GeneratorInterval(interval), + GeneratorLog(logging.NewDefaultLoggerFactory().NewLogger("test")), + ) + assert.NoError(t, err) + + stream := test.NewMockStream(&interceptor.StreamInfo{ + SSRC: 1, + RTCPFeedback: []interceptor.RTCPFeedback{{Type: "nack"}}, + }, i) + defer func() { + assert.NoError(t, stream.Close()) + }() + + for _, seqNum := range []uint16{10, 11, 12, 14, 16, 18} { + stream.ReceiveRTP(&rtp.Packet{Header: rtp.Header{SequenceNumber: seqNum}}) + + select { + case r := <-stream.ReadRTP(): + assert.NoError(t, r.Err) + assert.Equal(t, seqNum, r.Packet.SequenceNumber) + case <-time.After(10 * time.Millisecond): + t.Fatal("receiver rtp packet not found") + } + } + + time.Sleep(interval * 2) // wait for at least 2 nack packets + + select { + case <-stream.WrittenRTCP(): + // ignore the first nack, it might only contain the sequence id 13 as missing + default: + } + + select { + case pkts := <-stream.WrittenRTCP(): + assert.Equal(t, len(pkts), 1, "single packet RTCP Compound Packet expected") + + p, ok := pkts[0].(*rtcp.TransportLayerNack) + assert.True(t, ok, "TransportLayerNack rtcp packet expected, found: %T", pkts[0]) + + assert.Equal(t, uint16(13), p.Nacks[0].PacketID) + assert.Equal(t, rtcp.PacketBitmap(0b10), p.Nacks[0].LostPackets) // we want packets: 13, 15 (not packet 17, because skipLastN is setReceived to 2) + case <-time.After(10 * time.Millisecond): + t.Fatal("written rtcp packet not found") + } +} + +func TestGeneratorInterceptor_InvalidSize(t *testing.T) { + _, err := NewGeneratorInterceptor(GeneratorSize(5)) + assert.Error(t, err, ErrInvalidSize) +} diff --git a/pkg/nack/generator_option.go b/pkg/nack/generator_option.go new file mode 100644 index 00000000..86c627ba --- /dev/null +++ b/pkg/nack/generator_option.go @@ -0,0 +1,40 @@ +package nack + +import ( + "time" + + "github.com/pion/logging" +) + +// GeneratorOption can be used to configure GeneratorInterceptor +type GeneratorOption func(r *GeneratorInterceptor) + +// GeneratorSize sets the size of the interceptor. +// Size must be one of: 64, 128, 256, 512, 1024, 2048, 4096, 8192, 16384, 32768 +func GeneratorSize(size uint16) GeneratorOption { + return func(r *GeneratorInterceptor) { + r.size = size + } +} + +// GeneratorSkipLastN sets the number of packets (n-1 packets before the last received packets) to ignore when generating +// nack requests. +func GeneratorSkipLastN(skipLastN uint16) GeneratorOption { + return func(r *GeneratorInterceptor) { + r.skipLastN = skipLastN + } +} + +// GeneratorLog sets a logger for the interceptor +func GeneratorLog(log logging.LeveledLogger) GeneratorOption { + return func(r *GeneratorInterceptor) { + r.log = log + } +} + +// GeneratorInterval sets the nack send interval for the interceptor +func GeneratorInterval(interval time.Duration) GeneratorOption { + return func(r *GeneratorInterceptor) { + r.interval = interval + } +} diff --git a/pkg/nack/receive_log.go b/pkg/nack/receive_log.go new file mode 100644 index 00000000..8107f59a --- /dev/null +++ b/pkg/nack/receive_log.go @@ -0,0 +1,134 @@ +package nack + +import ( + "fmt" + "sync" +) + +type receiveLog struct { + packets []uint64 + size uint16 + end uint16 + started bool + lastConsecutive uint16 + m sync.RWMutex +} + +func newReceiveLog(size uint16) (*receiveLog, error) { + allowedSizes := make([]uint16, 0) + correctSize := false + for i := 6; i < 16; i++ { + if size == 1< end (with counting for rollovers) + for i := s.end + 1; i != seq; i++ { + // clear packets between end and seq (these may contain packets from a "size" ago) + s.delReceived(i) + } + s.end = seq + + if s.lastConsecutive+1 == seq { + s.lastConsecutive = seq + } else if seq-s.lastConsecutive > s.size { + s.lastConsecutive = seq - s.size + s.fixLastConsecutive() // there might be valid packets at the beginning of the buffer now + } + case s.lastConsecutive+1 == seq: + // negative diff, seq < end (with counting for rollovers) + s.lastConsecutive = seq + s.fixLastConsecutive() // there might be other valid packets after seq + } + + s.setReceived(seq) +} + +func (s *receiveLog) get(seq uint16) bool { + s.m.RLock() + defer s.m.RUnlock() + + diff := s.end - seq + if diff >= uint16SizeHalf { + return false + } + + if diff >= s.size { + return false + } + + return s.getReceived(seq) +} + +func (s *receiveLog) missingSeqNumbers(skipLastN uint16) []uint16 { + s.m.RLock() + defer s.m.RUnlock() + + until := s.end - skipLastN + if until-s.lastConsecutive >= uint16SizeHalf { + // until < s.lastConsecutive (counting for rollover) + return nil + } + + missingPacketSeqNums := make([]uint16, 0) + for i := s.lastConsecutive + 1; i != until+1; i++ { + if !s.getReceived(i) { + missingPacketSeqNums = append(missingPacketSeqNums, i) + } + } + + return missingPacketSeqNums +} + +func (s *receiveLog) setReceived(seq uint16) { + pos := seq % s.size + s.packets[pos/64] |= 1 << (pos % 64) +} + +func (s *receiveLog) delReceived(seq uint16) { + pos := seq % s.size + s.packets[pos/64] &^= 1 << (pos % 64) +} + +func (s *receiveLog) getReceived(seq uint16) bool { + pos := seq % s.size + return (s.packets[pos/64] & (1 << (pos % 64))) != 0 +} + +func (s *receiveLog) fixLastConsecutive() { + i := s.lastConsecutive + 1 + for ; i != s.end+1 && s.getReceived(i); i++ { + // find all consecutive packets + } + s.lastConsecutive = i - 1 +} diff --git a/pkg/nack/receive_log_test.go b/pkg/nack/receive_log_test.go new file mode 100644 index 00000000..a631a03e --- /dev/null +++ b/pkg/nack/receive_log_test.go @@ -0,0 +1,134 @@ +package nack + +import ( + "reflect" + "testing" +) + +func TestReceivedBuffer(t *testing.T) { + for _, start := range []uint16{0, 1, 127, 128, 129, 511, 512, 513, 32767, 32768, 32769, 65407, 65408, 65409, 65534, 65535} { + start := start + + rl, err := newReceiveLog(128) + if err != nil { + t.Fatalf("%+v", err) + } + + all := func(min uint16, max uint16) []uint16 { + result := make([]uint16, 0) + for i := min; i != max+1; i++ { + result = append(result, i) + } + return result + } + join := func(parts ...[]uint16) []uint16 { + result := make([]uint16, 0) + for _, p := range parts { + result = append(result, p...) + } + return result + } + + add := func(nums ...uint16) { + for _, n := range nums { + seq := start + n + rl.add(seq) + } + } + + assertGet := func(nums ...uint16) { + t.Helper() + for _, n := range nums { + seq := start + n + if !rl.get(seq) { + t.Errorf("not found: %d", seq) + } + } + } + assertNOTGet := func(nums ...uint16) { + t.Helper() + for _, n := range nums { + seq := start + n + if rl.get(seq) { + t.Errorf("packet found for %d", seq) + } + } + } + assertMissing := func(skipLastN uint16, nums []uint16) { + t.Helper() + missing := rl.missingSeqNumbers(skipLastN) + if missing == nil { + missing = []uint16{} + } + want := make([]uint16, 0, len(nums)) + for _, n := range nums { + want = append(want, start+n) + } + if !reflect.DeepEqual(want, missing) { + t.Errorf("missing want/got %v / %v", want, missing) + } + } + assertLastConsecutive := func(lastConsecutive uint16) { + want := lastConsecutive + start + if rl.lastConsecutive != want { + t.Errorf("invalid lastConsecutive want %d got %d", want, rl.lastConsecutive) + } + } + + add(0) + assertGet(0) + assertMissing(0, []uint16{}) + assertLastConsecutive(0) // first element added + + add(all(1, 127)...) + assertGet(all(1, 127)...) + assertMissing(0, []uint16{}) + assertLastConsecutive(127) + + add(128) + assertGet(128) + assertNOTGet(0) + assertMissing(0, []uint16{}) + assertLastConsecutive(128) + + add(130) + assertGet(130) + assertNOTGet(1, 2, 129) + assertMissing(0, []uint16{129}) + assertLastConsecutive(128) + + add(333) + assertGet(333) + assertNOTGet(all(0, 332)...) + assertMissing(0, all(206, 332)) // all 127 elements missing before 333 + assertMissing(10, all(206, 323)) // skip last 10 packets (324-333) from check + assertLastConsecutive(205) // lastConsecutive is still out of the buffer + + add(329) + assertGet(329) + assertMissing(0, join(all(206, 328), all(330, 332))) + assertMissing(5, join(all(206, 328))) // skip last 5 packets (329-333) from check + assertLastConsecutive(205) + + add(all(207, 320)...) + assertGet(all(207, 320)...) + assertMissing(0, join([]uint16{206}, all(321, 328), all(330, 332))) + assertLastConsecutive(205) + + add(334) + assertGet(334) + assertNOTGet(206) + assertMissing(0, join(all(321, 328), all(330, 332))) + assertLastConsecutive(320) // head of buffer is full of consecutive packages + + add(all(322, 328)...) + assertGet(all(322, 328)...) + assertMissing(0, join([]uint16{321}, all(330, 332))) + assertLastConsecutive(320) + + add(321) + assertGet(321) + assertMissing(0, all(330, 332)) + assertLastConsecutive(329) // after adding a single missing packet, lastConsecutive should jump forward + } +} diff --git a/pkg/nack/responder_interceptor.go b/pkg/nack/responder_interceptor.go new file mode 100644 index 00000000..b9fdc539 --- /dev/null +++ b/pkg/nack/responder_interceptor.go @@ -0,0 +1,127 @@ +package nack + +import ( + "sync" + + "github.com/pion/interceptor" + "github.com/pion/logging" + "github.com/pion/rtcp" + "github.com/pion/rtp" +) + +// ResponderInterceptor responds to nack feedback messages +type ResponderInterceptor struct { + interceptor.NoOp + size uint16 + streams *sync.Map + log logging.LeveledLogger +} + +type localStream struct { + sendBuffer *sendBuffer + rtpWriter interceptor.RTPWriter +} + +// NewResponderInterceptor returns a new GeneratorInterceptor interceptor +func NewResponderInterceptor(opts ...ResponderOption) (*ResponderInterceptor, error) { + r := &ResponderInterceptor{ + NoOp: interceptor.NoOp{}, + size: 8192, + streams: &sync.Map{}, + log: logging.NewDefaultLoggerFactory().NewLogger("nack_responder"), + } + + for _, opt := range opts { + opt(r) + } + + _, err := newSendBuffer(r.size) + if err != nil { + return nil, err + } + + return r, nil +} + +// BindRTCPReader lets you modify any incoming RTCP packets. It is called once per sender/receiver, however this might +// change in the future. The returned method will be called once per packet batch. +func (n *ResponderInterceptor) BindRTCPReader(reader interceptor.RTCPReader) interceptor.RTCPReader { + return interceptor.RTCPReaderFunc(func() ([]rtcp.Packet, interceptor.Attributes, error) { + pkts, attr, err := reader.Read() + if err != nil { + return nil, nil, err + } + + for _, rtcpPacket := range pkts { + nack, ok := rtcpPacket.(*rtcp.TransportLayerNack) + if !ok { + continue + } + + go n.resendPackets(nack) + } + + return pkts, attr, err + }) +} + +// BindLocalStream lets you modify any outgoing RTP packets. It is called once for per LocalStream. The returned method +// will be called once per rtp packet. +func (n *ResponderInterceptor) BindLocalStream(info *interceptor.StreamInfo, writer interceptor.RTPWriter) interceptor.RTPWriter { + hasNack := false + for _, fb := range info.RTCPFeedback { + if fb.Type == "nack" && fb.Parameter == "" { + hasNack = true + } + } + + if !hasNack { + return writer + } + + // error is already checked in NewGeneratorInterceptor + sendBuffer, _ := newSendBuffer(n.size) + n.streams.Store(info.SSRC, &localStream{sendBuffer: sendBuffer, rtpWriter: writer}) + + return interceptor.RTPWriterFunc(func(p *rtp.Packet, attributes interceptor.Attributes) (int, error) { + sendBuffer.add(p) + + return writer.Write(p, attributes) + }) +} + +// UnbindLocalStream is called when the Stream is removed. It can be used to clean up any data related to that track. +func (n *ResponderInterceptor) UnbindLocalStream(info *interceptor.StreamInfo) { + n.streams.Delete(info.SSRC) +} + +func (n *ResponderInterceptor) resendPackets(nack *rtcp.TransportLayerNack) { + v, ok := n.streams.Load(nack.MediaSSRC) + if !ok { + return + } + + stream := v.(*localStream) + seqNums := nackParsToSequenceNumbers(nack.Nacks) + + for _, seq := range seqNums { + p := stream.sendBuffer.get(seq) + if p == nil { + continue + } + + _, err := stream.rtpWriter.Write(p, interceptor.Attributes{}) + if err != nil { + n.log.Warnf("failed resending nacked packet: %+v", err) + } + } +} + +func nackParsToSequenceNumbers(pairs []rtcp.NackPair) []uint16 { + seqs := make([]uint16, 0) + for _, pair := range pairs { + seqs = append(seqs, pair.PacketList()...) + } + + return seqs +} diff --git a/pkg/nack/responder_interceptor_test.go b/pkg/nack/responder_interceptor_test.go new file mode 100644 index 00000000..5ff322d3 --- /dev/null +++ b/pkg/nack/responder_interceptor_test.go @@ -0,0 +1,71 @@ +package nack + +import ( + "testing" + "time" + + "github.com/pion/interceptor" + "github.com/pion/interceptor/internal/test" + "github.com/pion/logging" + "github.com/pion/rtcp" + "github.com/pion/rtp" + "github.com/stretchr/testify/assert" +) + +func TestResponderInterceptor(t *testing.T) { + i, err := NewResponderInterceptor( + ResponderSize(8), + ResponderLog(logging.NewDefaultLoggerFactory().NewLogger("test")), + ) + assert.NoError(t, err) + + stream := test.NewMockStream(&interceptor.StreamInfo{ + SSRC: 1, + RTCPFeedback: []interceptor.RTCPFeedback{{Type: "nack"}}, + }, i) + defer func() { + assert.NoError(t, stream.Close()) + }() + + for _, seqNum := range []uint16{10, 11, 12, 14, 15} { + assert.NoError(t, stream.WriteRTP(&rtp.Packet{Header: rtp.Header{SequenceNumber: seqNum}})) + + select { + case p := <-stream.WrittenRTP(): + assert.Equal(t, seqNum, p.SequenceNumber) + case <-time.After(10 * time.Millisecond): + t.Fatal("written rtp packet not found") + } + } + + stream.ReceiveRTCP([]rtcp.Packet{ + &rtcp.TransportLayerNack{ + MediaSSRC: 1, + SenderSSRC: 2, + Nacks: []rtcp.NackPair{ + {PacketID: 11, LostPackets: 0b1011}, // sequence numbers: 11, 12, 13, 15 + }, + }, + }) + + // seq number 13 was never sent, so it can't be resent + for _, seqNum := range []uint16{11, 12, 15} { + select { + case p := <-stream.WrittenRTP(): + assert.Equal(t, seqNum, p.SequenceNumber) + case <-time.After(10 * time.Millisecond): + t.Fatal("written rtp packet not found") + } + } + + select { + case p := <-stream.WrittenRTP(): + t.Errorf("no more rtp packets expected, found sequence number: %v", p.SequenceNumber) + case <-time.After(10 * time.Millisecond): + } +} + +func TestResponderInterceptor_InvalidSize(t *testing.T) { + _, err := NewResponderInterceptor(ResponderSize(5)) + assert.Error(t, err, ErrInvalidSize) +} diff --git a/pkg/nack/responder_option.go b/pkg/nack/responder_option.go new file mode 100644 index 00000000..a4a1d292 --- /dev/null +++ b/pkg/nack/responder_option.go @@ -0,0 +1,21 @@ +package nack + +import "github.com/pion/logging" + +// ResponderOption can be used to configure ResponderInterceptor +type ResponderOption func(s *ResponderInterceptor) + +// ResponderSize sets the size of the interceptor. +// Size must be one of: 1, 2, 4, 8, 16, 32, 64, 128, 256, 512, 1024, 2048, 4096, 8192, 16384, 32768 +func ResponderSize(size uint16) ResponderOption { + return func(r *ResponderInterceptor) { + r.size = size + } +} + +// ResponderLog sets a logger for the interceptor +func ResponderLog(log logging.LeveledLogger) ResponderOption { + return func(r *ResponderInterceptor) { + r.log = log + } +} diff --git a/pkg/nack/send_buffer.go b/pkg/nack/send_buffer.go new file mode 100644 index 00000000..cf3f020e --- /dev/null +++ b/pkg/nack/send_buffer.go @@ -0,0 +1,74 @@ +package nack + +import ( + "fmt" + + "github.com/pion/rtp" +) + +const ( + uint16SizeHalf = 1 << 15 +) + +type sendBuffer struct { + packets []*rtp.Packet + size uint16 + lastAdded uint16 + started bool +} + +func newSendBuffer(size uint16) (*sendBuffer, error) { + allowedSizes := make([]uint16, 0) + correctSize := false + for i := 0; i < 16; i++ { + if size == 1<= uint16SizeHalf { + return nil + } + + if diff >= s.size { + return nil + } + + return s.packets[seq%s.size] +} diff --git a/pkg/nack/send_buffer_test.go b/pkg/nack/send_buffer_test.go new file mode 100644 index 00000000..afba2169 --- /dev/null +++ b/pkg/nack/send_buffer_test.go @@ -0,0 +1,64 @@ +package nack + +import ( + "testing" + + "github.com/pion/rtp" + "github.com/stretchr/testify/assert" +) + +func TestSendBuffer(t *testing.T) { + for _, start := range []uint16{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 511, 512, 513, 32767, 32768, 32769, 65527, 65528, 65529, 65530, 65531, 65532, 65533, 65534, 65535} { + start := start + + sb, err := newSendBuffer(8) + assert.NoError(t, err) + + add := func(nums ...uint16) { + for _, n := range nums { + seq := start + n + sb.add(&rtp.Packet{Header: rtp.Header{SequenceNumber: seq}}) + } + } + + assertGet := func(nums ...uint16) { + t.Helper() + for _, n := range nums { + seq := start + n + packet := sb.get(seq) + if packet == nil { + t.Errorf("packet not found: %d", seq) + continue + } + if packet.SequenceNumber != seq { + t.Errorf("packet for %d returned with incorrect SequenceNumber: %d", seq, packet.SequenceNumber) + } + } + } + assertNOTGet := func(nums ...uint16) { + t.Helper() + for _, n := range nums { + seq := start + n + packet := sb.get(seq) + if packet != nil { + t.Errorf("packet found for %d: %d", seq, packet.SequenceNumber) + } + } + } + + add(0, 1, 2, 3, 4, 5, 6, 7) + assertGet(0, 1, 2, 3, 4, 5, 6, 7) + + add(8) + assertGet(8) + assertNOTGet(0) + + add(10) + assertGet(10) + assertNOTGet(1, 2, 9) + + add(22) + assertGet(22) + assertNOTGet(3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21) + } +} diff --git a/registry.go b/registry.go index f504f1ae..d3eed3bf 100644 --- a/registry.go +++ b/registry.go @@ -1,5 +1,3 @@ -// +build !js - package interceptor // Registry is a collector for interceptors. diff --git a/streaminfo.go b/streaminfo.go index ea90009a..956fa530 100644 --- a/streaminfo.go +++ b/streaminfo.go @@ -1,5 +1,3 @@ -// +build !js - package interceptor // RTPHeaderExtension represents a negotiated RFC5285 RTP header extension.