From 72c0be86db35deb9b0a7361063314923a3d9a68c Mon Sep 17 00:00:00 2001 From: Rob Elsner Date: Thu, 11 Apr 2024 14:37:29 -0400 Subject: [PATCH] JitterBuffer: Add a JitterBuffer-based Interceptor The JitterBufferInterceptor is designed to fit in a RemoteStream pipeline and buffer incoming packets for a short period (currently defaulting to 50 packets) before emitting packets to be consumed by the next step in the pipeline. The caller must ensure they are prepared to handle an ErrPopWhileBuffering in the case that insufficient packets have been received by the jitter buffer. The caller should retry the operation at some point later as the buffer may have been filled in the interim. The caller should also be aware that an ErrBufferUnderrun may be returned in the case that the initial buffering was sufficient and playback began but the caller is consuming packets (or they are not arriving) quickly enough. --- attributes.go | 4 +- internal/test/mock_stream.go | 7 +- pkg/jitterbuffer/jitter_buffer.go | 13 +++ pkg/jitterbuffer/jitter_buffer_test.go | 17 ++- pkg/jitterbuffer/option.go | 19 +++ pkg/jitterbuffer/priority_queue.go | 10 ++ pkg/jitterbuffer/priority_queue_test.go | 15 +++ pkg/jitterbuffer/receiver_interceptor.go | 110 ++++++++++++++++++ pkg/jitterbuffer/receiver_interceptor_test.go | 98 ++++++++++++++++ 9 files changed, 288 insertions(+), 5 deletions(-) create mode 100644 pkg/jitterbuffer/option.go create mode 100644 pkg/jitterbuffer/receiver_interceptor.go create mode 100644 pkg/jitterbuffer/receiver_interceptor_test.go diff --git a/attributes.go b/attributes.go index d7936d52..8b6d0f5c 100644 --- a/attributes.go +++ b/attributes.go @@ -33,7 +33,7 @@ func (a Attributes) Set(key interface{}, val interface{}) { } // GetRTPHeader gets the RTP header if present. If it is not present, it will be -// unmarshalled from the raw byte slice and stored in the attribtues. +// unmarshalled from the raw byte slice and stored in the attributes. func (a Attributes) GetRTPHeader(raw []byte) (*rtp.Header, error) { if val, ok := a[rtpHeaderKey]; ok { if header, ok := val.(*rtp.Header); ok { @@ -50,7 +50,7 @@ func (a Attributes) GetRTPHeader(raw []byte) (*rtp.Header, error) { } // GetRTCPPackets gets the RTCP packets if present. If the packet slice is not -// present, it will be unmarshaled from the raw byte slice and stored in the +// present, it will be unmarshalled from the raw byte slice and stored in the // attributes. func (a Attributes) GetRTCPPackets(raw []byte) ([]rtcp.Packet, error) { if val, ok := a[rtcpPacketsKey]; ok { diff --git a/internal/test/mock_stream.go b/internal/test/mock_stream.go index 1b8641b4..bf96e31b 100644 --- a/internal/test/mock_stream.go +++ b/internal/test/mock_stream.go @@ -129,6 +129,9 @@ func NewMockStream(info *interceptor.StreamInfo, i interceptor.Interceptor) *Moc for { i, _, err := s.rtpReader.Read(buf, interceptor.Attributes{}) if err != nil { + if err.Error() == "attempt to pop while buffering" { + continue + } if errors.Is(err, io.EOF) { s.rtpInModified <- RTPWithError{Err: err} } @@ -160,12 +163,12 @@ func (s *MockStream) WriteRTP(p *rtp.Packet) error { return err } -// ReceiveRTCP schedules a new rtcp batch, so it can be read be the stream +// ReceiveRTCP schedules a new rtcp batch, so it can be read by the stream func (s *MockStream) ReceiveRTCP(pkts []rtcp.Packet) { s.rtcpIn <- pkts } -// ReceiveRTP schedules a rtp packet, so it can be read be the stream +// ReceiveRTP schedules a rtp packet, so it can be read by the stream func (s *MockStream) ReceiveRTP(packet *rtp.Packet) { s.rtpIn <- packet } diff --git a/pkg/jitterbuffer/jitter_buffer.go b/pkg/jitterbuffer/jitter_buffer.go index f2f60919..976ea763 100644 --- a/pkg/jitterbuffer/jitter_buffer.go +++ b/pkg/jitterbuffer/jitter_buffer.go @@ -267,3 +267,16 @@ func (jb *JitterBuffer) PopAtTimestamp(ts uint32) (*rtp.Packet, error) { jb.updateState() return packet, nil } + +// Clear will empty the buffer and optionally reset the state +func (jb *JitterBuffer) Clear(resetState bool) { + jb.mutex.Lock() + defer jb.mutex.Unlock() + jb.packets.Clear() + if resetState { + jb.lastSequence = 0 + jb.state = Buffering + jb.stats = Stats{0, 0, 0} + jb.minStartCount = 50 + } +} diff --git a/pkg/jitterbuffer/jitter_buffer_test.go b/pkg/jitterbuffer/jitter_buffer_test.go index 253d3e72..205e6104 100644 --- a/pkg/jitterbuffer/jitter_buffer_test.go +++ b/pkg/jitterbuffer/jitter_buffer_test.go @@ -25,7 +25,6 @@ func TestJitterBuffer(t *testing.T) { jb.Push(&rtp.Packet{Header: rtp.Header{SequenceNumber: 5012, Timestamp: 512}, Payload: []byte{0x02}}) - assert.Equal(jb.lastSequence, uint16(5012)) assert.Equal(jb.stats.outOfOrderCount, uint32(1)) assert.Equal(jb.packets.Length(), uint16(4)) assert.Equal(jb.lastSequence, uint16(5012)) @@ -220,4 +219,20 @@ func TestJitterBuffer(t *testing.T) { assert.NotNil(pkt) } }) + + t.Run("Allows clearing the buffer", func(*testing.T) { + jb := New() + jb.Clear(false) + + assert.Equal(jb.lastSequence, uint16(0)) + jb.Push(&rtp.Packet{Header: rtp.Header{SequenceNumber: 5000, Timestamp: 500}, Payload: []byte{0x02}}) + jb.Push(&rtp.Packet{Header: rtp.Header{SequenceNumber: 5001, Timestamp: 501}, Payload: []byte{0x02}}) + jb.Push(&rtp.Packet{Header: rtp.Header{SequenceNumber: 5002, Timestamp: 502}, Payload: []byte{0x02}}) + + assert.Equal(jb.lastSequence, uint16(5002)) + jb.Clear(true) + assert.Equal(jb.lastSequence, uint16(0)) + assert.Equal(jb.stats.outOfOrderCount, uint32(0)) + assert.Equal(jb.packets.Length(), uint16(0)) + }) } diff --git a/pkg/jitterbuffer/option.go b/pkg/jitterbuffer/option.go new file mode 100644 index 00000000..9a33c22e --- /dev/null +++ b/pkg/jitterbuffer/option.go @@ -0,0 +1,19 @@ +// SPDX-FileCopyrightText: 2023 The Pion community +// SPDX-License-Identifier: MIT + +package jitterbuffer + +import ( + "github.com/pion/logging" +) + +// ReceiverInterceptorOption can be used to configure ReceiverInterceptor +type ReceiverInterceptorOption func(d *ReceiverInterceptor) error + +// Log sets a logger for the interceptor +func Log(log logging.LeveledLogger) ReceiverInterceptorOption { + return func(d *ReceiverInterceptor) error { + d.log = log + return nil + } +} diff --git a/pkg/jitterbuffer/priority_queue.go b/pkg/jitterbuffer/priority_queue.go index 50e70c81..f6d7d93b 100644 --- a/pkg/jitterbuffer/priority_queue.go +++ b/pkg/jitterbuffer/priority_queue.go @@ -177,3 +177,13 @@ func (q *PriorityQueue) PopAtTimestamp(timestamp uint32) (*rtp.Packet, error) { } return nil, ErrNotFound } + +// Clear will empty a PriorityQueue +func (q *PriorityQueue) Clear() { + next := q.next + q.length = 0 + for next != nil { + next.prev = nil + next = next.next + } +} diff --git a/pkg/jitterbuffer/priority_queue_test.go b/pkg/jitterbuffer/priority_queue_test.go index d28cc720..7fb2a7a6 100644 --- a/pkg/jitterbuffer/priority_queue_test.go +++ b/pkg/jitterbuffer/priority_queue_test.go @@ -121,3 +121,18 @@ func TestPriorityQueue_Find(t *testing.T) { _, err = packets.Find(1001) assert.Error(t, err) } + +func TestPriorityQueue_Clean(t *testing.T) { + packets := NewQueue() + packets.Clear() + packets.Push(&rtp.Packet{ + Header: rtp.Header{ + SequenceNumber: 1000, + Timestamp: 5, + SSRC: 5, + }, + Payload: []uint8{0xA}, + }, 1000) + assert.EqualValues(t, 1, packets.Length()) + packets.Clear() +} diff --git a/pkg/jitterbuffer/receiver_interceptor.go b/pkg/jitterbuffer/receiver_interceptor.go new file mode 100644 index 00000000..b4c032b9 --- /dev/null +++ b/pkg/jitterbuffer/receiver_interceptor.go @@ -0,0 +1,110 @@ +// SPDX-FileCopyrightText: 2023 The Pion community +// SPDX-License-Identifier: MIT + +package jitterbuffer + +import ( + "sync" + + "github.com/pion/interceptor" + "github.com/pion/logging" + "github.com/pion/rtp" +) + +// InterceptorFactory is a interceptor.Factory for a GeneratorInterceptor +type InterceptorFactory struct { + opts []ReceiverInterceptorOption +} + +// NewInterceptor constructs a new ReceiverInterceptor +func (g *InterceptorFactory) NewInterceptor(_ string) (interceptor.Interceptor, error) { + i := &ReceiverInterceptor{ + close: make(chan struct{}), + log: logging.NewDefaultLoggerFactory().NewLogger("jitterbuffer"), + buffer: New(), + } + + for _, opt := range g.opts { + if err := opt(i); err != nil { + return nil, err + } + } + + return i, nil +} + +// ReceiverInterceptor places a JitterBuffer in the chain to smooth packet arrival +// and allow for network jitter +// +// The Interceptor is designed to fit in a RemoteStream +// pipeline and buffer incoming packets for a short period (currently +// defaulting to 50 packets) before emitting packets to be consumed by the +// next step in the pipeline. +// +// The caller must ensure they are prepared to handle an +// ErrPopWhileBuffering in the case that insufficient packets have been +// received by the jitter buffer. The caller should retry the operation +// at some point later as the buffer may have been filled in the interim. +// +// The caller should also be aware that an ErrBufferUnderrun may be +// returned in the case that the initial buffering was sufficient and +// playback began but the caller is consuming packets (or they are not +// arriving) quickly enough. +type ReceiverInterceptor struct { + interceptor.NoOp + buffer *JitterBuffer + m sync.Mutex + wg sync.WaitGroup + close chan struct{} + log logging.LeveledLogger +} + +// NewInterceptor returns a new InterceptorFactory +func NewInterceptor(opts ...ReceiverInterceptorOption) (*InterceptorFactory, error) { + return &InterceptorFactory{opts}, nil +} + +// BindRemoteStream lets you modify any incoming RTP packets. It is called once for per RemoteStream. The returned method +// will be called once per rtp packet. +func (i *ReceiverInterceptor) BindRemoteStream(_ *interceptor.StreamInfo, reader interceptor.RTPReader) interceptor.RTPReader { + return interceptor.RTPReaderFunc(func(b []byte, a interceptor.Attributes) (int, interceptor.Attributes, error) { + buf := make([]byte, len(b)) + n, attr, err := reader.Read(buf, a) + if err != nil { + return n, attr, err + } + packet := &rtp.Packet{} + if err := packet.Unmarshal(buf); err != nil { + return 0, nil, err + } + i.m.Lock() + defer i.m.Unlock() + i.buffer.Push(packet) + if i.buffer.state == Emitting { + newPkt, err := i.buffer.Pop() + if err != nil { + return 0, nil, err + } + nlen, err := newPkt.MarshalTo(b) + return nlen, attr, err + } + return n, attr, ErrPopWhileBuffering + }) +} + +// UnbindRemoteStream is called when the Stream is removed. It can be used to clean up any data related to that track. +func (i *ReceiverInterceptor) UnbindRemoteStream(_ *interceptor.StreamInfo) { + defer i.wg.Wait() + i.m.Lock() + defer i.m.Unlock() + i.buffer.Clear(true) +} + +// Close closes the interceptor +func (i *ReceiverInterceptor) Close() error { + defer i.wg.Wait() + i.m.Lock() + defer i.m.Unlock() + i.buffer.Clear(true) + return nil +} diff --git a/pkg/jitterbuffer/receiver_interceptor_test.go b/pkg/jitterbuffer/receiver_interceptor_test.go new file mode 100644 index 00000000..58685966 --- /dev/null +++ b/pkg/jitterbuffer/receiver_interceptor_test.go @@ -0,0 +1,98 @@ +// SPDX-FileCopyrightText: 2023 The Pion community +// SPDX-License-Identifier: MIT + +package jitterbuffer + +import ( + "bytes" + "testing" + "time" + + "github.com/pion/interceptor" + "github.com/pion/interceptor/internal/test" + "github.com/pion/logging" + "github.com/pion/rtcp" + "github.com/pion/rtp" + "github.com/stretchr/testify/assert" +) + +func TestBufferStart(t *testing.T) { + buf := bytes.Buffer{} + + factory, err := NewInterceptor( + Log(logging.NewDefaultLoggerFactory().NewLogger("test")), + ) + assert.NoError(t, err) + + i, err := factory.NewInterceptor("") + assert.NoError(t, err) + + assert.Zero(t, buf.Len()) + + stream := test.NewMockStream(&interceptor.StreamInfo{ + SSRC: 123456, + ClockRate: 90000, + }, i) + defer func() { + assert.NoError(t, stream.Close()) + }() + + stream.ReceiveRTCP([]rtcp.Packet{&rtcp.PictureLossIndication{ + SenderSSRC: 123, + MediaSSRC: 456, + }}) + stream.ReceiveRTP(&rtp.Packet{Header: rtp.Header{ + SequenceNumber: uint16(0), + }}) + + // Give time for packets to be handled and stream written to. + time.Sleep(50 * time.Millisecond) + select { + case pkt := <-stream.ReadRTP(): + assert.EqualValues(t, nil, pkt) + default: + // No data ready to read, this is what we expect + } + err = i.Close() + assert.NoError(t, err) + assert.Zero(t, buf.Len()) +} + +func TestReceiverBuffersAndPlaysout(t *testing.T) { + buf := bytes.Buffer{} + + factory, err := NewInterceptor( + Log(logging.NewDefaultLoggerFactory().NewLogger("test")), + ) + assert.NoError(t, err) + + i, err := factory.NewInterceptor("") + assert.NoError(t, err) + + assert.EqualValues(t, 0, buf.Len()) + + stream := test.NewMockStream(&interceptor.StreamInfo{ + SSRC: 123456, + ClockRate: 90000, + }, i) + + stream.ReceiveRTCP([]rtcp.Packet{&rtcp.PictureLossIndication{ + SenderSSRC: 123, + MediaSSRC: 456, + }}) + for s := 0; s < 61; s++ { + stream.ReceiveRTP(&rtp.Packet{Header: rtp.Header{ + SequenceNumber: uint16(s), + }}) + } + // Give time for packets to be handled and stream written to. + time.Sleep(50 * time.Millisecond) + for s := 0; s < 10; s++ { + read := <-stream.ReadRTP() + seq := read.Packet.Header.SequenceNumber + assert.EqualValues(t, uint16(s), seq) + } + assert.NoError(t, stream.Close()) + err = i.Close() + assert.NoError(t, err) +}