Skip to content

Commit

Permalink
Add interceptor to aggregate CCFB reports
Browse files Browse the repository at this point in the history
  • Loading branch information
mengelbart committed Jan 27, 2025
1 parent c06f448 commit 89e56ac
Show file tree
Hide file tree
Showing 11 changed files with 1,338 additions and 4 deletions.
9 changes: 5 additions & 4 deletions internal/test/mock_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ type RTPWithError struct {
// RTCPWithError is used to send a batch of rtcp packets or an error on a channel
type RTCPWithError struct {
Packets []rtcp.Packet
Attr interceptor.Attributes
Err error
}

Expand Down Expand Up @@ -107,21 +108,21 @@ func NewMockStream(info *interceptor.StreamInfo, i interceptor.Interceptor) *Moc
go func() {
buf := make([]byte, 1500)
for {
i, _, err := s.rtcpReader.Read(buf, interceptor.Attributes{})
i, attr, err := s.rtcpReader.Read(buf, interceptor.Attributes{})
if err != nil {
if !errors.Is(err, io.EOF) {
s.rtcpInModified <- RTCPWithError{Err: err}
s.rtcpInModified <- RTCPWithError{Attr: attr, Err: err}

Check warning on line 114 in internal/test/mock_stream.go

View check run for this annotation

Codecov / codecov/patch

internal/test/mock_stream.go#L114

Added line #L114 was not covered by tests
}
return
}

pkts, err := rtcp.Unmarshal(buf[:i])
if err != nil {
s.rtcpInModified <- RTCPWithError{Err: err}
s.rtcpInModified <- RTCPWithError{Attr: attr, Err: err}
return
}

s.rtcpInModified <- RTCPWithError{Packets: pkts}
s.rtcpInModified <- RTCPWithError{Attr: attr, Packets: pkts}
}
}()
go func() {
Expand Down
59 changes: 59 additions & 0 deletions pkg/ccfb/ccfb_receiver.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package ccfb

Check failure on line 1 in pkg/ccfb/ccfb_receiver.go

View workflow job for this annotation

GitHub Actions / lint / Go

package-comments: should have a package comment (revive)

import (
"time"

"github.com/pion/interceptor/internal/ntp"
"github.com/pion/rtcp"
)

type acknowledgement struct {
seqNr uint16
arrived bool
arrival time.Time
ecn rtcp.ECN
}

func convertCCFB(ts time.Time, feedback *rtcp.CCFeedbackReport) (time.Time, map[uint32][]acknowledgement) {
if feedback == nil {
return time.Time{}, nil
}
result := map[uint32][]acknowledgement{}
referenceTime := ntp.ToTime32(feedback.ReportTimestamp, ts)
for _, rb := range feedback.ReportBlocks {
result[rb.MediaSSRC] = convertMetricBlock(referenceTime, rb.BeginSequence, rb.MetricBlocks)
}
return referenceTime, result
}

func convertMetricBlock(reference time.Time, seqNrOffset uint16, blocks []rtcp.CCFeedbackMetricBlock) []acknowledgement {
reports := make([]acknowledgement, len(blocks))
for i, mb := range blocks {
if mb.Received {
arrival := time.Time{}

/// RFC 8888 states: If the measurement is unavailable or if the
//arrival time of the RTP packet is after the time represented by

Check failure on line 36 in pkg/ccfb/ccfb_receiver.go

View workflow job for this annotation

GitHub Actions / lint / Go

commentFormatting: put a space between `//` and comment text (gocritic)
//the RTS field, then an ATO value of 0x1FFF MUST be reported for
//the packet. In that case, we set a zero time.Time value.
if mb.ArrivalTimeOffset != 0x1FFF {
delta := time.Duration((float64(mb.ArrivalTimeOffset) / 1024.0) * float64(time.Second))
arrival = reference.Add(-delta)
}
reports[i] = acknowledgement{
seqNr: seqNrOffset + uint16(i),
arrived: true,
arrival: arrival,
ecn: mb.ECN,
}
} else {
reports[i] = acknowledgement{
seqNr: seqNrOffset + uint16(i),
arrived: false,
arrival: time.Time{},
ecn: 0,
}
}
}
return reports
}
193 changes: 193 additions & 0 deletions pkg/ccfb/ccfb_receiver_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
package ccfb

import (
"fmt"
"testing"
"time"

"github.com/pion/interceptor/internal/ntp"
"github.com/pion/rtcp"
"github.com/stretchr/testify/assert"
)

func TestConvertCCFB(t *testing.T) {
timeZero := time.Now()
cases := []struct {
ts time.Time
feedback *rtcp.CCFeedbackReport
expect map[uint32][]acknowledgement
expectTS time.Time
}{
{},
{
ts: timeZero.Add(2 * time.Second),
feedback: &rtcp.CCFeedbackReport{
SenderSSRC: 1,
ReportBlocks: []rtcp.CCFeedbackReportBlock{
{
MediaSSRC: 2,
BeginSequence: 17,
MetricBlocks: []rtcp.CCFeedbackMetricBlock{
{
Received: true,
ECN: 0,
ArrivalTimeOffset: 512,
},
},
},
},
ReportTimestamp: ntp.ToNTP32(timeZero.Add(time.Second)),
},
expect: map[uint32][]acknowledgement{
2: []acknowledgement{
{
seqNr: 17,
arrived: true,
arrival: timeZero.Add(500 * time.Millisecond),
ecn: 0,
},
},
},
expectTS: timeZero.Add(time.Second),
},
}
for i, tc := range cases {
t.Run(fmt.Sprintf("%v", i), func(t *testing.T) {
resTS, res := convertCCFB(tc.ts, tc.feedback)

assert.InDelta(t, tc.expectTS.UnixNano(), resTS.UnixNano(), float64(time.Millisecond.Nanoseconds()))

// Can't directly check equality since arrival timestamp conversions
// may be slightly off due to ntp conversions.
assert.Equal(t, len(tc.expect), len(res))
for i, acks := range tc.expect {
for j, ack := range acks {
assert.Equal(t, ack.seqNr, res[i][j].seqNr)
assert.Equal(t, ack.arrived, res[i][j].arrived)
assert.Equal(t, ack.ecn, res[i][j].ecn)
assert.InDelta(t, ack.arrival.UnixNano(), res[i][j].arrival.UnixNano(), float64(time.Millisecond.Nanoseconds()))
}
}
})
}
}

func TestConvertMetricBlock(t *testing.T) {
cases := []struct {
ts time.Time
reference time.Time
seqNrOffset uint16
blocks []rtcp.CCFeedbackMetricBlock
expected []acknowledgement
}{
{
ts: time.Time{},
reference: time.Time{},
seqNrOffset: 0,
blocks: []rtcp.CCFeedbackMetricBlock{},
expected: []acknowledgement{},
},
{
ts: time.Time{}.Add(2 * time.Second),
reference: time.Time{}.Add(time.Second),
seqNrOffset: 3,
blocks: []rtcp.CCFeedbackMetricBlock{
{
Received: true,
ECN: 0,
ArrivalTimeOffset: 512,
},
{
Received: false,
ECN: 0,
ArrivalTimeOffset: 0,
},
{
Received: true,
ECN: 0,
ArrivalTimeOffset: 0,
},
},
expected: []acknowledgement{
{
seqNr: 3,
arrived: true,
arrival: time.Time{}.Add(500 * time.Millisecond),
ecn: 0,
},
{
seqNr: 4,
arrived: false,
arrival: time.Time{},
ecn: 0,
},
{
seqNr: 5,
arrived: true,
arrival: time.Time{}.Add(time.Second),
ecn: 0,
},
},
},
{
ts: time.Time{}.Add(2 * time.Second),
reference: time.Time{}.Add(time.Second),
seqNrOffset: 3,
blocks: []rtcp.CCFeedbackMetricBlock{
{
Received: true,
ECN: 0,
ArrivalTimeOffset: 512,
},
{
Received: false,
ECN: 0,
ArrivalTimeOffset: 0,
},
{
Received: true,
ECN: 0,
ArrivalTimeOffset: 0,
},
{
Received: true,
ECN: 0,
ArrivalTimeOffset: 0x1FFF,
},
},
expected: []acknowledgement{
{
seqNr: 3,
arrived: true,
arrival: time.Time{}.Add(500 * time.Millisecond),
ecn: 0,
},
{
seqNr: 4,
arrived: false,
arrival: time.Time{},
ecn: 0,
},
{
seqNr: 5,
arrived: true,
arrival: time.Time{}.Add(time.Second),
ecn: 0,
},
{
seqNr: 6,
arrived: true,
arrival: time.Time{},
ecn: 0,
},
},
},
}

for i, tc := range cases {
t.Run(fmt.Sprintf("%v", i), func(t *testing.T) {
res := convertMetricBlock(tc.reference, tc.seqNrOffset, tc.blocks)
assert.Equal(t, tc.expected, res)
})
}
}
29 changes: 29 additions & 0 deletions pkg/ccfb/duplicate_ack_filter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package ccfb

// DuplicateAckFilter is a helper to remove duplicate acks from a Report.
type DuplicateAckFilter struct {
highestAckedBySSRC map[uint32]int64
}

// NewDuplicateAckFilter creates a new DuplicateAckFilter
func NewDuplicateAckFilter() *DuplicateAckFilter {
return &DuplicateAckFilter{
highestAckedBySSRC: make(map[uint32]int64),
}
}

// Filter filters duplicate acks. It filters out all acks for packets with a
// sequence number smaller than the highest seen sequence number for each SSRC.
func (f *DuplicateAckFilter) Filter(reports Report) {
for ssrc, prs := range reports.SSRCToPacketReports {
n := 0
for _, report := range prs {
if highest, ok := f.highestAckedBySSRC[ssrc]; !ok || report.SeqNr > highest {
f.highestAckedBySSRC[ssrc] = report.SeqNr
prs[n] = report
n++
}
}
reports.SSRCToPacketReports[ssrc] = prs[:n]
}
}
Loading

0 comments on commit 89e56ac

Please sign in to comment.