From 3c49840b7412fc7930b5eb59b65068afdc016842 Mon Sep 17 00:00:00 2001 From: Kevin Wang Date: Tue, 9 Aug 2022 23:24:54 -0400 Subject: [PATCH] Add playout delay header interceptor This interceptor adds the playout delay header extension following http://www.webrtc.org/experiments/rtp-hdrext/playout-delay --- AUTHORS.txt | 1 + .../header_extension_interceptor.go | 65 ++++++++++++++++++ .../header_extension_interceptor_test.go | 66 +++++++++++++++++++ pkg/playoutdelay/playout_delay.go | 10 +++ 4 files changed, 142 insertions(+) create mode 100644 pkg/playoutdelay/header_extension_interceptor.go create mode 100644 pkg/playoutdelay/header_extension_interceptor_test.go create mode 100644 pkg/playoutdelay/playout_delay.go diff --git a/AUTHORS.txt b/AUTHORS.txt index 83d90647..2bea2e1e 100644 --- a/AUTHORS.txt +++ b/AUTHORS.txt @@ -15,5 +15,6 @@ boks1971 David Zhao Jonathan Müller Kevin Caffrey +Kevin Wang Mathis Engelbart Sean DuBois diff --git a/pkg/playoutdelay/header_extension_interceptor.go b/pkg/playoutdelay/header_extension_interceptor.go new file mode 100644 index 00000000..ac0c482c --- /dev/null +++ b/pkg/playoutdelay/header_extension_interceptor.go @@ -0,0 +1,65 @@ +package playoutdelay + +import ( + "time" + + "github.com/pion/interceptor" + "github.com/pion/rtp" +) + +// HeaderExtensionInterceptorFactory is a interceptor.Factory for a HeaderExtensionInterceptor +type HeaderExtensionInterceptorFactory struct{} + +const ( + playoutDelayMaxMs = 40950 +) + +// NewInterceptor constructs a new HeaderExtensionInterceptor +func (h *HeaderExtensionInterceptorFactory) NewInterceptor(id string, minDelay, maxDelay time.Duration) (interceptor.Interceptor, error) { + if minDelay.Milliseconds() < 0 || minDelay.Milliseconds() > playoutDelayMaxMs || maxDelay.Milliseconds() < 0 || maxDelay.Milliseconds() > playoutDelayMaxMs { + return nil, errPlayoutDelayInvalidValue + } + return &HeaderExtensionInterceptor{minDelay: uint16(minDelay.Milliseconds() / 10), maxDelay: uint16(maxDelay.Milliseconds() / 10)}, nil +} + +// NewHeaderExtensionInterceptor returns a HeaderExtensionInterceptorFactory +func NewHeaderExtensionInterceptor() (*HeaderExtensionInterceptorFactory, error) { + return &HeaderExtensionInterceptorFactory{}, nil +} + +// HeaderExtensionInterceptor adds transport wide sequence numbers as header extension to each RTP packet +type HeaderExtensionInterceptor struct { + interceptor.NoOp + minDelay, maxDelay uint16 +} + +const playoutDelayURI = "http://www.webrtc.org/experiments/rtp-hdrext/playout-delay" + +// BindLocalStream returns a writer that adds a rtp.PlayoutDelayExtension +// header with increasing sequence numbers to each outgoing packet. +func (h *HeaderExtensionInterceptor) BindLocalStream(info *interceptor.StreamInfo, writer interceptor.RTPWriter) interceptor.RTPWriter { + var hdrExtID uint8 + for _, e := range info.RTPHeaderExtensions { + if e.URI == playoutDelayURI { + hdrExtID = uint8(e.ID) + break + } + } + if hdrExtID == 0 { // Don't add header extension if ID is 0, because 0 is an invalid extension ID + return writer + } + return interceptor.RTPWriterFunc(func(header *rtp.Header, payload []byte, attributes interceptor.Attributes) (int, error) { + ext, err := (&rtp.PlayoutDelayExtension{ + minDelay: h.minDelay, + maxDelay: h.maxDelay, + }).Marshal() + if err != nil { + return 0, err + } + err = header.SetExtension(hdrExtID, ext) + if err != nil { + return 0, err + } + return writer.Write(header, payload, attributes) + }) +} diff --git a/pkg/playoutdelay/header_extension_interceptor_test.go b/pkg/playoutdelay/header_extension_interceptor_test.go new file mode 100644 index 00000000..1d3d6116 --- /dev/null +++ b/pkg/playoutdelay/header_extension_interceptor_test.go @@ -0,0 +1,66 @@ +package playoutdelay + +import ( + "sync" + "testing" + "time" + + "github.com/pion/interceptor" + "github.com/pion/interceptor/internal/test" + "github.com/pion/rtp" + "github.com/stretchr/testify/assert" +) + +func TestHeaderExtensionInterceptor(t *testing.T) { + t.Run("add playout delay to each packet", func(t *testing.T) { + factory, err := NewHeaderExtensionInterceptor() + assert.NoError(t, err) + + inter, err := factory.NewInterceptor("", 10*time.Millisecond, 20*time.Millisecond) + assert.NoError(t, err) + + pChan := make(chan *rtp.Packet, 10*5) + go func() { + // start some parallel streams using the same interceptor to test for race conditions + var wg sync.WaitGroup + num := 10 + wg.Add(num) + for i := 0; i < num; i++ { + go func(ch chan *rtp.Packet, id uint16) { + stream := test.NewMockStream(&interceptor.StreamInfo{RTPHeaderExtensions: []interceptor.RTPHeaderExtension{ + { + URI: playoutDelayURI, + ID: 1, + }, + }}, inter) + defer func() { + wg.Done() + assert.NoError(t, stream.Close()) + }() + + for _, seqNum := range []uint16{id * 1, id * 2, id * 3, id * 4, id * 5} { + assert.NoError(t, stream.WriteRTP(&rtp.Packet{Header: rtp.Header{SequenceNumber: seqNum}})) + select { + case p := <-stream.WrittenRTP(): + assert.Equal(t, seqNum, p.SequenceNumber) + ch <- p + case <-time.After(10 * time.Millisecond): + panic("written rtp packet not found") + } + } + }(pChan, uint16(i+1)) + } + wg.Wait() + close(pChan) + }() + + for p := range pChan { + extensionHeader := p.GetExtension(1) + ext := &rtp.PlayoutDelayExtension{} + err = ext.Unmarshal(extensionHeader) + assert.NoError(t, err) + assert.Equal(t, uint16(1), ext.minDelay) + assert.Equal(t, uint16(2), ext.maxDelay) + } + }) +} diff --git a/pkg/playoutdelay/playout_delay.go b/pkg/playoutdelay/playout_delay.go new file mode 100644 index 00000000..d2831465 --- /dev/null +++ b/pkg/playoutdelay/playout_delay.go @@ -0,0 +1,10 @@ +// Package playoutdelay implements the playout delay header extension. +package playoutdelay + +import ( + "errors" +) + +var ( + errPlayoutDelayInvalidValue = errors.New("invalid playout delay value") +)