Skip to content

Commit

Permalink
review fixes, rename receiver -> generator, sender -> responder, lint…
Browse files Browse the repository at this point in the history
…er fixes
  • Loading branch information
masterada committed Nov 29, 2020
1 parent fd6bc60 commit f62bc84
Show file tree
Hide file tree
Showing 13 changed files with 240 additions and 134 deletions.
13 changes: 13 additions & 0 deletions test/stream.go → internal/test/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/pion/rtp"
)

// Stream is a helper struct for testing interceptors.
type Stream struct {
interceptor interceptor.Interceptor

Expand All @@ -26,16 +27,19 @@ type Stream struct {
rtpInModified chan RTPWithError
}

// RTPWithError is used to send an rtp packet or an error on a channel
type RTPWithError struct {
Packet *rtp.Packet
Err error
}

// RTCPWithError is used to send a batch of rtcp packets or an error on a channel
type RTCPWithError struct {
Packets []rtcp.Packet
Err error
}

// NewStream creates a new Stream
func NewStream(info *interceptor.StreamInfo, i interceptor.Interceptor) *Stream {
s := &Stream{
interceptor: i,
Expand Down Expand Up @@ -107,40 +111,49 @@ func NewStream(info *interceptor.StreamInfo, i interceptor.Interceptor) *Stream
return s
}

// WriteRTCP writes a batch of rtcp packet to the stream, using the interceptor
func (s *Stream) WriteRTCP(pkts []rtcp.Packet) error {
_, err := s.rtcpWriter.Write(pkts, interceptor.Attributes{})
return err
}

// WriteRTP writes an rtp packet to the stream, using the interceptor
func (s *Stream) WriteRTP(p *rtp.Packet) error {
_, err := s.rtpWriter.Write(p, interceptor.Attributes{})
return err
}

// ReceiveRTCP schedules a new rtcp batch, so it can be read be the stream
func (s *Stream) ReceiveRTCP(pkts []rtcp.Packet) {
s.rtcpIn <- pkts
}

// ReceiveRTP schedules a rtp packet, so it can be read be the stream
func (s *Stream) ReceiveRTP(packet *rtp.Packet) {
s.rtpIn <- packet
}

// WrittenRTCP returns a channel containing the rtcp batches written, modified by the interceptor
func (s *Stream) WrittenRTCP() chan []rtcp.Packet {
return s.rtcpOutModified
}

// WrittenRTP returns a channel containing rtp packets written, modified by the interceptor
func (s *Stream) WrittenRTP() chan *rtp.Packet {
return s.rtpOutModified
}

// ReadRTCP returns a channel containing the rtcp batched read, modified by the interceptor
func (s *Stream) ReadRTCP() chan RTCPWithError {
return s.rtcpInModified
}

// ReadRTP returns a channel containing the rtp packets read, modified by the interceptor
func (s *Stream) ReadRTP() chan RTPWithError {
return s.rtpInModified
}

// Close closes the stream and the underlying interceptor
func (s *Stream) Close() error {
close(s.rtcpIn)
close(s.rtpIn)
Expand Down
File renamed without changes.
8 changes: 8 additions & 0 deletions nack/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package nack

import "errors"

var (
// ErrInvalidSize is returned by newReceiveLog/newSendBuffer, when an incorrect buffer size is supplied.
ErrInvalidSize = errors.New("invalid buffer size")
)
60 changes: 33 additions & 27 deletions nack/receiver_interceptor.go → nack/generator_interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ import (
"github.com/pion/rtp"
)

// ReceiverInterceptor interceptor generates nack messages.
type ReceiverInterceptor struct {
// GeneratorInterceptor interceptor generates nack feedback messages.
type GeneratorInterceptor struct {
interceptor.NoOp
size uint16
skipLastN uint16
Expand All @@ -24,38 +24,43 @@ type ReceiverInterceptor struct {
log logging.LeveledLogger
}

// NewReceiverInterceptor returns a new ReceiverInterceptor interceptor
func NewReceiverInterceptor(size uint16, skipLastN uint16, interval time.Duration, log logging.LeveledLogger) (*ReceiverInterceptor, error) {
_, err := NewReceiveLog(size)
// NewGeneratorInterceptor returns a new GeneratorInterceptor interceptor
func NewGeneratorInterceptor(opts ...GeneratorOption) (*GeneratorInterceptor, error) {
r := &GeneratorInterceptor{
NoOp: interceptor.NoOp{},
size: 8192,
skipLastN: 0,
interval: time.Millisecond * 100,
receiveLogs: &sync.Map{},
close: make(chan struct{}),
log: logging.NewDefaultLoggerFactory().NewLogger("nack_generator"),
}

for _, opt := range opts {
opt(r)
}

_, err := newReceiveLog(r.size)
if err != nil {
return nil, err
}

return &ReceiverInterceptor{
NoOp: interceptor.NoOp{},
size: size,
skipLastN: skipLastN,
interval: interval,
receiveLogs: &sync.Map{},
close: make(chan struct{}),
log: log,
}, nil
return r, nil
}

// BindRTCPWriter lets you modify any outgoing RTCP packets. It is called once per PeerConnection. The returned method
// will be called once per packet batch.
func (n *ReceiverInterceptor) BindRTCPWriter(writer interceptor.RTCPWriter) interceptor.RTCPWriter {
func (n *GeneratorInterceptor) BindRTCPWriter(writer interceptor.RTCPWriter) interceptor.RTCPWriter {
n.m.Lock()
defer n.m.Unlock()
select {
case <-n.close:
// already closed
n.m.Unlock()
return writer
default:
}

n.wg.Add(1)
n.m.Unlock()

go n.loop(writer)

Expand All @@ -64,7 +69,7 @@ func (n *ReceiverInterceptor) BindRTCPWriter(writer interceptor.RTCPWriter) inte

// BindRemoteStream lets you modify any incoming RTP packets. It is called once for per RemoteStream. The returned method
// will be called once per rtp packet.
func (n *ReceiverInterceptor) BindRemoteStream(info *interceptor.StreamInfo, reader interceptor.RTPReader) interceptor.RTPReader {
func (n *GeneratorInterceptor) BindRemoteStream(info *interceptor.StreamInfo, reader interceptor.RTPReader) interceptor.RTPReader {
hasNack := false
for _, fb := range info.RTCPFeedback {
if fb.Type == "nack" && fb.Parameter == "" {
Expand All @@ -76,8 +81,8 @@ func (n *ReceiverInterceptor) BindRemoteStream(info *interceptor.StreamInfo, rea
return reader
}

// error is already checked in NewReceiverInterceptor
receiveLog, _ := NewReceiveLog(n.size)
// error is already checked in NewGeneratorInterceptor
receiveLog, _ := newReceiveLog(n.size)
n.receiveLogs.Store(info.SSRC, receiveLog)

return interceptor.RTPReaderFunc(func() (*rtp.Packet, interceptor.Attributes, error) {
Expand All @@ -86,18 +91,19 @@ func (n *ReceiverInterceptor) BindRemoteStream(info *interceptor.StreamInfo, rea
return nil, nil, err
}

receiveLog.Add(p.SequenceNumber)
receiveLog.add(p.SequenceNumber)

return p, attr, nil
})
}

// UnbindLocalStream is called when the Stream is removed. It can be used to clean up any data related to that track.
func (n *ReceiverInterceptor) UnbindLocalStream(info *interceptor.StreamInfo) {
func (n *GeneratorInterceptor) UnbindLocalStream(info *interceptor.StreamInfo) {
n.receiveLogs.Delete(info.SSRC)
}

func (n *ReceiverInterceptor) Close() error {
// Close closes the interceptor
func (n *GeneratorInterceptor) Close() error {
defer n.wg.Wait()
n.m.Lock()
defer n.m.Unlock()
Expand All @@ -114,20 +120,20 @@ func (n *ReceiverInterceptor) Close() error {
return nil
}

func (n *ReceiverInterceptor) loop(rtcpWriter interceptor.RTCPWriter) {
func (n *GeneratorInterceptor) loop(rtcpWriter interceptor.RTCPWriter) {
defer n.wg.Done()

senderSSRC := rand.Uint32()
senderSSRC := rand.Uint32() // #nosec

ticker := time.NewTicker(n.interval)
for {
select {
case <-ticker.C:
n.receiveLogs.Range(func(key, value interface{}) bool {
ssrc := key.(uint32)
receiveLog := value.(*ReceiveLog)
receiveLog := value.(*receiveLog)

missing := receiveLog.MissingSeqNumbers(n.skipLastN)
missing := receiveLog.missingSeqNumbers(n.skipLastN)
if len(missing) == 0 {
return true
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,20 +1,26 @@
package nack

import (
"errors"
"testing"
"time"

"github.com/pion/interceptor"
"github.com/pion/interceptor/test"
"github.com/pion/interceptor/internal/test"
"github.com/pion/logging"
"github.com/pion/rtcp"
"github.com/pion/rtp"
"github.com/stretchr/testify/assert"
)

func TestReceiverInterceptor(t *testing.T) {
func TestGeneratorInterceptor(t *testing.T) {
const interval = time.Millisecond * 10
i, err := NewReceiverInterceptor(64, 2, interval, logging.NewDefaultLoggerFactory().NewLogger("test"))
i, err := NewGeneratorInterceptor(
GeneratorSize(64),
GeneratorSkipLastN(2),
GeneratorInterval(interval),
GeneratorLog(logging.NewDefaultLoggerFactory().NewLogger("test")),
)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -63,8 +69,15 @@ func TestReceiverInterceptor(t *testing.T) {
}

assert.Equal(t, uint16(13), p.Nacks[0].PacketID)
assert.Equal(t, rtcp.PacketBitmap(0b10), p.Nacks[0].LostPackets) // we want packets: 13, 15 (not packet 17, because skipLastN is set to 2)
assert.Equal(t, rtcp.PacketBitmap(0b10), p.Nacks[0].LostPackets) // we want packets: 13, 15 (not packet 17, because skipLastN is setReceived to 2)
case <-time.After(10 * time.Millisecond):
t.Fatal("written rtcp packet not found")
}
}

func TestGeneratorInterceptor_InvalidSize(t *testing.T) {
_, err := NewGeneratorInterceptor(GeneratorSize(5))
if err == nil || !errors.Is(err, ErrInvalidSize) {
t.Fatalf("expected invalid size error, got: %v", err)
}
}
40 changes: 40 additions & 0 deletions nack/generator_option.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package nack

import (
"time"

"github.com/pion/logging"
)

// GeneratorOption can be used to configure GeneratorInterceptor
type GeneratorOption func(r *GeneratorInterceptor)

// GeneratorSize sets the size of the interceptor.
// Size must be one of: 64, 128, 256, 512, 1024, 2048, 4096, 8192, 16384, 32768
func GeneratorSize(size uint16) GeneratorOption {
return func(r *GeneratorInterceptor) {
r.size = size
}
}

// GeneratorSkipLastN sets the number of packets (n-1 packets before the last received packets) to ignore when generating
// nack requests.
func GeneratorSkipLastN(skipLastN uint16) GeneratorOption {
return func(r *GeneratorInterceptor) {
r.skipLastN = skipLastN
}
}

// GeneratorLog sets a logger for the interceptor
func GeneratorLog(log logging.LeveledLogger) GeneratorOption {
return func(r *GeneratorInterceptor) {
r.log = log
}
}

// GeneratorInterval sets the nack send interval for the interceptor
func GeneratorInterval(interval time.Duration) GeneratorOption {
return func(r *GeneratorInterceptor) {
r.interval = interval
}
}
Loading

0 comments on commit f62bc84

Please sign in to comment.