Skip to content

Commit

Permalink
JitterBuffer: Add a JitterBuffer-based Interceptor
Browse files Browse the repository at this point in the history
The JitterBufferInterceptor is designed to fit in a RemoteStream
pipeline and buffer incoming packets for a short period (currently
defaulting to 50 packets) before emitting packets to be consumed by the
next step in the pipeline.

The caller must ensure they are prepared to handle an
ErrPopWhileBuffering in the case that insufficient packets have been
received by the jitter buffer. The caller should retry the operation
at some point later as the buffer may have been filled in the interim.

The caller should also be aware that an ErrBufferUnderrun may be
returned in the case that the initial buffering was sufficient and
playback began but the caller is consuming packets (or they are not
arriving) quickly enough.
  • Loading branch information
thatsnotright authored and Sean-Der committed Apr 21, 2024
1 parent 35da023 commit 72c0be8
Show file tree
Hide file tree
Showing 9 changed files with 288 additions and 5 deletions.
4 changes: 2 additions & 2 deletions attributes.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func (a Attributes) Set(key interface{}, val interface{}) {
}

// GetRTPHeader gets the RTP header if present. If it is not present, it will be
// unmarshalled from the raw byte slice and stored in the attribtues.
// unmarshalled from the raw byte slice and stored in the attributes.
func (a Attributes) GetRTPHeader(raw []byte) (*rtp.Header, error) {
if val, ok := a[rtpHeaderKey]; ok {
if header, ok := val.(*rtp.Header); ok {
Expand All @@ -50,7 +50,7 @@ func (a Attributes) GetRTPHeader(raw []byte) (*rtp.Header, error) {
}

// GetRTCPPackets gets the RTCP packets if present. If the packet slice is not
// present, it will be unmarshaled from the raw byte slice and stored in the
// present, it will be unmarshalled from the raw byte slice and stored in the
// attributes.
func (a Attributes) GetRTCPPackets(raw []byte) ([]rtcp.Packet, error) {
if val, ok := a[rtcpPacketsKey]; ok {
Expand Down
7 changes: 5 additions & 2 deletions internal/test/mock_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,9 @@ func NewMockStream(info *interceptor.StreamInfo, i interceptor.Interceptor) *Moc
for {
i, _, err := s.rtpReader.Read(buf, interceptor.Attributes{})
if err != nil {
if err.Error() == "attempt to pop while buffering" {
continue
}
if errors.Is(err, io.EOF) {
s.rtpInModified <- RTPWithError{Err: err}
}
Expand Down Expand Up @@ -160,12 +163,12 @@ func (s *MockStream) WriteRTP(p *rtp.Packet) error {
return err
}

// ReceiveRTCP schedules a new rtcp batch, so it can be read be the stream
// ReceiveRTCP schedules a new rtcp batch, so it can be read by the stream
func (s *MockStream) ReceiveRTCP(pkts []rtcp.Packet) {
s.rtcpIn <- pkts
}

// ReceiveRTP schedules a rtp packet, so it can be read be the stream
// ReceiveRTP schedules a rtp packet, so it can be read by the stream
func (s *MockStream) ReceiveRTP(packet *rtp.Packet) {
s.rtpIn <- packet
}
Expand Down
13 changes: 13 additions & 0 deletions pkg/jitterbuffer/jitter_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,3 +267,16 @@ func (jb *JitterBuffer) PopAtTimestamp(ts uint32) (*rtp.Packet, error) {
jb.updateState()
return packet, nil
}

// Clear will empty the buffer and optionally reset the state
func (jb *JitterBuffer) Clear(resetState bool) {
jb.mutex.Lock()
defer jb.mutex.Unlock()
jb.packets.Clear()
if resetState {
jb.lastSequence = 0
jb.state = Buffering
jb.stats = Stats{0, 0, 0}
jb.minStartCount = 50
}
}
17 changes: 16 additions & 1 deletion pkg/jitterbuffer/jitter_buffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ func TestJitterBuffer(t *testing.T) {

jb.Push(&rtp.Packet{Header: rtp.Header{SequenceNumber: 5012, Timestamp: 512}, Payload: []byte{0x02}})

assert.Equal(jb.lastSequence, uint16(5012))
assert.Equal(jb.stats.outOfOrderCount, uint32(1))
assert.Equal(jb.packets.Length(), uint16(4))
assert.Equal(jb.lastSequence, uint16(5012))
Expand Down Expand Up @@ -220,4 +219,20 @@ func TestJitterBuffer(t *testing.T) {
assert.NotNil(pkt)
}
})

t.Run("Allows clearing the buffer", func(*testing.T) {
jb := New()
jb.Clear(false)

assert.Equal(jb.lastSequence, uint16(0))
jb.Push(&rtp.Packet{Header: rtp.Header{SequenceNumber: 5000, Timestamp: 500}, Payload: []byte{0x02}})
jb.Push(&rtp.Packet{Header: rtp.Header{SequenceNumber: 5001, Timestamp: 501}, Payload: []byte{0x02}})
jb.Push(&rtp.Packet{Header: rtp.Header{SequenceNumber: 5002, Timestamp: 502}, Payload: []byte{0x02}})

assert.Equal(jb.lastSequence, uint16(5002))
jb.Clear(true)
assert.Equal(jb.lastSequence, uint16(0))
assert.Equal(jb.stats.outOfOrderCount, uint32(0))
assert.Equal(jb.packets.Length(), uint16(0))
})
}
19 changes: 19 additions & 0 deletions pkg/jitterbuffer/option.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
// SPDX-FileCopyrightText: 2023 The Pion community <https://pion.ly>
// SPDX-License-Identifier: MIT

package jitterbuffer

import (
"github.com/pion/logging"
)

// ReceiverInterceptorOption can be used to configure ReceiverInterceptor
type ReceiverInterceptorOption func(d *ReceiverInterceptor) error

// Log sets a logger for the interceptor
func Log(log logging.LeveledLogger) ReceiverInterceptorOption {
return func(d *ReceiverInterceptor) error {
d.log = log
return nil
}
}
10 changes: 10 additions & 0 deletions pkg/jitterbuffer/priority_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,3 +177,13 @@ func (q *PriorityQueue) PopAtTimestamp(timestamp uint32) (*rtp.Packet, error) {
}
return nil, ErrNotFound
}

// Clear will empty a PriorityQueue
func (q *PriorityQueue) Clear() {
next := q.next
q.length = 0
for next != nil {
next.prev = nil
next = next.next
}
}
15 changes: 15 additions & 0 deletions pkg/jitterbuffer/priority_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,3 +121,18 @@ func TestPriorityQueue_Find(t *testing.T) {
_, err = packets.Find(1001)
assert.Error(t, err)
}

func TestPriorityQueue_Clean(t *testing.T) {
packets := NewQueue()
packets.Clear()
packets.Push(&rtp.Packet{
Header: rtp.Header{
SequenceNumber: 1000,
Timestamp: 5,
SSRC: 5,
},
Payload: []uint8{0xA},
}, 1000)
assert.EqualValues(t, 1, packets.Length())
packets.Clear()
}
110 changes: 110 additions & 0 deletions pkg/jitterbuffer/receiver_interceptor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
// SPDX-FileCopyrightText: 2023 The Pion community <https://pion.ly>
// SPDX-License-Identifier: MIT

package jitterbuffer

import (
"sync"

"github.com/pion/interceptor"
"github.com/pion/logging"
"github.com/pion/rtp"
)

// InterceptorFactory is a interceptor.Factory for a GeneratorInterceptor
type InterceptorFactory struct {
opts []ReceiverInterceptorOption
}

// NewInterceptor constructs a new ReceiverInterceptor
func (g *InterceptorFactory) NewInterceptor(_ string) (interceptor.Interceptor, error) {
i := &ReceiverInterceptor{
close: make(chan struct{}),
log: logging.NewDefaultLoggerFactory().NewLogger("jitterbuffer"),
buffer: New(),
}

for _, opt := range g.opts {
if err := opt(i); err != nil {
return nil, err
}
}

return i, nil
}

// ReceiverInterceptor places a JitterBuffer in the chain to smooth packet arrival
// and allow for network jitter
//
// The Interceptor is designed to fit in a RemoteStream
// pipeline and buffer incoming packets for a short period (currently
// defaulting to 50 packets) before emitting packets to be consumed by the
// next step in the pipeline.
//
// The caller must ensure they are prepared to handle an
// ErrPopWhileBuffering in the case that insufficient packets have been
// received by the jitter buffer. The caller should retry the operation
// at some point later as the buffer may have been filled in the interim.
//
// The caller should also be aware that an ErrBufferUnderrun may be
// returned in the case that the initial buffering was sufficient and
// playback began but the caller is consuming packets (or they are not
// arriving) quickly enough.
type ReceiverInterceptor struct {
interceptor.NoOp
buffer *JitterBuffer
m sync.Mutex
wg sync.WaitGroup
close chan struct{}
log logging.LeveledLogger
}

// NewInterceptor returns a new InterceptorFactory
func NewInterceptor(opts ...ReceiverInterceptorOption) (*InterceptorFactory, error) {
return &InterceptorFactory{opts}, nil
}

// BindRemoteStream lets you modify any incoming RTP packets. It is called once for per RemoteStream. The returned method
// will be called once per rtp packet.
func (i *ReceiverInterceptor) BindRemoteStream(_ *interceptor.StreamInfo, reader interceptor.RTPReader) interceptor.RTPReader {
return interceptor.RTPReaderFunc(func(b []byte, a interceptor.Attributes) (int, interceptor.Attributes, error) {
buf := make([]byte, len(b))
n, attr, err := reader.Read(buf, a)
if err != nil {
return n, attr, err
}
packet := &rtp.Packet{}
if err := packet.Unmarshal(buf); err != nil {
return 0, nil, err
}
i.m.Lock()
defer i.m.Unlock()
i.buffer.Push(packet)
if i.buffer.state == Emitting {
newPkt, err := i.buffer.Pop()
if err != nil {
return 0, nil, err
}
nlen, err := newPkt.MarshalTo(b)
return nlen, attr, err
}
return n, attr, ErrPopWhileBuffering
})
}

// UnbindRemoteStream is called when the Stream is removed. It can be used to clean up any data related to that track.
func (i *ReceiverInterceptor) UnbindRemoteStream(_ *interceptor.StreamInfo) {
defer i.wg.Wait()
i.m.Lock()
defer i.m.Unlock()
i.buffer.Clear(true)
}

// Close closes the interceptor
func (i *ReceiverInterceptor) Close() error {
defer i.wg.Wait()
i.m.Lock()
defer i.m.Unlock()
i.buffer.Clear(true)
return nil
}
98 changes: 98 additions & 0 deletions pkg/jitterbuffer/receiver_interceptor_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
// SPDX-FileCopyrightText: 2023 The Pion community <https://pion.ly>
// SPDX-License-Identifier: MIT

package jitterbuffer

import (
"bytes"
"testing"
"time"

"github.com/pion/interceptor"
"github.com/pion/interceptor/internal/test"
"github.com/pion/logging"
"github.com/pion/rtcp"
"github.com/pion/rtp"
"github.com/stretchr/testify/assert"
)

func TestBufferStart(t *testing.T) {
buf := bytes.Buffer{}

factory, err := NewInterceptor(
Log(logging.NewDefaultLoggerFactory().NewLogger("test")),
)
assert.NoError(t, err)

i, err := factory.NewInterceptor("")
assert.NoError(t, err)

assert.Zero(t, buf.Len())

stream := test.NewMockStream(&interceptor.StreamInfo{
SSRC: 123456,
ClockRate: 90000,
}, i)
defer func() {
assert.NoError(t, stream.Close())
}()

stream.ReceiveRTCP([]rtcp.Packet{&rtcp.PictureLossIndication{
SenderSSRC: 123,
MediaSSRC: 456,
}})
stream.ReceiveRTP(&rtp.Packet{Header: rtp.Header{
SequenceNumber: uint16(0),
}})

// Give time for packets to be handled and stream written to.
time.Sleep(50 * time.Millisecond)
select {
case pkt := <-stream.ReadRTP():
assert.EqualValues(t, nil, pkt)
default:
// No data ready to read, this is what we expect
}
err = i.Close()
assert.NoError(t, err)
assert.Zero(t, buf.Len())
}

func TestReceiverBuffersAndPlaysout(t *testing.T) {
buf := bytes.Buffer{}

factory, err := NewInterceptor(
Log(logging.NewDefaultLoggerFactory().NewLogger("test")),
)
assert.NoError(t, err)

i, err := factory.NewInterceptor("")
assert.NoError(t, err)

assert.EqualValues(t, 0, buf.Len())

stream := test.NewMockStream(&interceptor.StreamInfo{
SSRC: 123456,
ClockRate: 90000,
}, i)

stream.ReceiveRTCP([]rtcp.Packet{&rtcp.PictureLossIndication{
SenderSSRC: 123,
MediaSSRC: 456,
}})
for s := 0; s < 61; s++ {
stream.ReceiveRTP(&rtp.Packet{Header: rtp.Header{
SequenceNumber: uint16(s),
}})
}
// Give time for packets to be handled and stream written to.
time.Sleep(50 * time.Millisecond)
for s := 0; s < 10; s++ {
read := <-stream.ReadRTP()
seq := read.Packet.Header.SequenceNumber
assert.EqualValues(t, uint16(s), seq)
}
assert.NoError(t, stream.Close())
err = i.Close()
assert.NoError(t, err)
}

0 comments on commit 72c0be8

Please sign in to comment.