diff --git a/pkg/rtpfb/acknowledgement.go b/pkg/rtpfb/acknowledgement.go new file mode 100644 index 00000000..4abb0de7 --- /dev/null +++ b/pkg/rtpfb/acknowledgement.go @@ -0,0 +1,17 @@ +// SPDX-FileCopyrightText: 2025 The Pion community +// SPDX-License-Identifier: MIT + +package rtpfb + +import ( + "time" + + "github.com/pion/rtcp" +) + +type acknowledgement struct { + sequenceNumber uint16 + arrived bool + arrival time.Time + ecn rtcp.ECN +} diff --git a/pkg/rtpfb/ccfb_receiver.go b/pkg/rtpfb/ccfb_receiver.go new file mode 100644 index 00000000..4d680064 --- /dev/null +++ b/pkg/rtpfb/ccfb_receiver.go @@ -0,0 +1,70 @@ +// SPDX-FileCopyrightText: 2025 The Pion community +// SPDX-License-Identifier: MIT + +package rtpfb + +import ( + "time" + + "github.com/pion/interceptor/internal/ntp" + "github.com/pion/rtcp" +) + +func convertCCFB(ts time.Time, feedback *rtcp.CCFeedbackReport) (time.Duration, map[uint32][]acknowledgement) { + if feedback == nil { + return 0, nil + } + result := map[uint32][]acknowledgement{} + referenceTime := ntp.ToTime32(feedback.ReportTimestamp, ts) + latestArrival := time.Time{} + for _, rb := range feedback.ReportBlocks { + var la time.Time + la, result[rb.MediaSSRC] = convertMetricBlock(referenceTime, rb.BeginSequence, rb.MetricBlocks) + if la.After(latestArrival) { + latestArrival = la + } + } + + return referenceTime.Sub(latestArrival), result +} + +func convertMetricBlock( + reference time.Time, + seqNrOffset uint16, + blocks []rtcp.CCFeedbackMetricBlock, +) (time.Time, []acknowledgement) { + reports := make([]acknowledgement, len(blocks)) + latestArrival := time.Time{} + for i, mb := range blocks { + if mb.Received { + arrival := time.Time{} + + // RFC 8888 states: If the measurement is unavailable or if the + // arrival time of the RTP packet is after the time represented by + // the RTS field, then an ATO value of 0x1FFF MUST be reported for + // the packet. In that case, we set a zero time.Time value. + if mb.ArrivalTimeOffset != 0x1FFF { + delta := time.Duration((float64(mb.ArrivalTimeOffset) / 1024.0) * float64(time.Second)) + arrival = reference.Add(-delta) + if arrival.After(latestArrival) { + latestArrival = arrival + } + } + reports[i] = acknowledgement{ + sequenceNumber: seqNrOffset + uint16(i), // nolint:gosec + arrived: true, + arrival: arrival, + ecn: mb.ECN, + } + } else { + reports[i] = acknowledgement{ + sequenceNumber: seqNrOffset + uint16(i), // nolint:gosec + arrived: false, + arrival: time.Time{}, + ecn: 0, + } + } + } + + return latestArrival, reports +} diff --git a/pkg/rtpfb/ccfb_receiver_test.go b/pkg/rtpfb/ccfb_receiver_test.go new file mode 100644 index 00000000..a02d60b1 --- /dev/null +++ b/pkg/rtpfb/ccfb_receiver_test.go @@ -0,0 +1,200 @@ +// SPDX-FileCopyrightText: 2025 The Pion community +// SPDX-License-Identifier: MIT + +package rtpfb + +import ( + "fmt" + "testing" + "time" + + "github.com/pion/interceptor/internal/ntp" + "github.com/pion/rtcp" + "github.com/stretchr/testify/assert" +) + +func TestConvertCCFB(t *testing.T) { + timeZero := time.Now() + cases := []struct { + ts time.Time + feedback *rtcp.CCFeedbackReport + expect map[uint32][]acknowledgement + expectAckDelay time.Duration + }{ + {}, + { + ts: timeZero.Add(2 * time.Second), + feedback: &rtcp.CCFeedbackReport{ + SenderSSRC: 1, + ReportBlocks: []rtcp.CCFeedbackReportBlock{ + { + MediaSSRC: 2, + BeginSequence: 17, + MetricBlocks: []rtcp.CCFeedbackMetricBlock{ + { + Received: true, + ECN: 0, + ArrivalTimeOffset: 512, + }, + }, + }, + }, + ReportTimestamp: ntp.ToNTP32(timeZero.Add(time.Second)), + }, + expect: map[uint32][]acknowledgement{ + 2: { + { + sequenceNumber: 17, + arrived: true, + arrival: timeZero.Add(500 * time.Millisecond), + ecn: 0, + }, + }, + }, + expectAckDelay: 500 * time.Millisecond, + }, + } + for i, tc := range cases { + t.Run(fmt.Sprintf("%v", i), func(t *testing.T) { + ackDelay, res := convertCCFB(tc.ts, tc.feedback) + + assert.Equal(t, tc.expectAckDelay, ackDelay) + + // Can't directly check equality since arrival timestamp conversions + // may be slightly off due to ntp conversions. + assert.Equal(t, len(tc.expect), len(res)) + for i, acks := range tc.expect { + for j, ack := range acks { + assert.Equal(t, ack.sequenceNumber, res[i][j].sequenceNumber) + assert.Equal(t, ack.arrived, res[i][j].arrived) + assert.Equal(t, ack.ecn, res[i][j].ecn) + assert.InDelta(t, ack.arrival.UnixNano(), res[i][j].arrival.UnixNano(), float64(time.Millisecond.Nanoseconds())) + } + } + }) + } +} + +func TestConvertMetricBlock(t *testing.T) { + cases := []struct { + ts time.Time + reference time.Time + seqNrOffset uint16 + blocks []rtcp.CCFeedbackMetricBlock + expected []acknowledgement + expectedLatestArrival time.Time + }{ + { + ts: time.Time{}, + reference: time.Time{}, + seqNrOffset: 0, + blocks: []rtcp.CCFeedbackMetricBlock{}, + expected: []acknowledgement{}, + }, + { + ts: time.Time{}.Add(2 * time.Second), + reference: time.Time{}.Add(time.Second), + seqNrOffset: 3, + blocks: []rtcp.CCFeedbackMetricBlock{ + { + Received: true, + ECN: 0, + ArrivalTimeOffset: 512, + }, + { + Received: false, + ECN: 0, + ArrivalTimeOffset: 0, + }, + { + Received: true, + ECN: 0, + ArrivalTimeOffset: 0, + }, + }, + expected: []acknowledgement{ + { + sequenceNumber: 3, + arrived: true, + arrival: time.Time{}.Add(500 * time.Millisecond), + ecn: 0, + }, + { + sequenceNumber: 4, + arrived: false, + arrival: time.Time{}, + ecn: 0, + }, + { + sequenceNumber: 5, + arrived: true, + arrival: time.Time{}.Add(time.Second), + ecn: 0, + }, + }, + expectedLatestArrival: time.Time{}.Add(time.Second), + }, + { + ts: time.Time{}.Add(2 * time.Second), + reference: time.Time{}.Add(time.Second), + seqNrOffset: 3, + blocks: []rtcp.CCFeedbackMetricBlock{ + { + Received: true, + ECN: 0, + ArrivalTimeOffset: 512, + }, + { + Received: false, + ECN: 0, + ArrivalTimeOffset: 0, + }, + { + Received: true, + ECN: 0, + ArrivalTimeOffset: 0, + }, + { + Received: true, + ECN: 0, + ArrivalTimeOffset: 0x1FFF, + }, + }, + expected: []acknowledgement{ + { + sequenceNumber: 3, + arrived: true, + arrival: time.Time{}.Add(500 * time.Millisecond), + ecn: 0, + }, + { + sequenceNumber: 4, + arrived: false, + arrival: time.Time{}, + ecn: 0, + }, + { + sequenceNumber: 5, + arrived: true, + arrival: time.Time{}.Add(time.Second), + ecn: 0, + }, + { + sequenceNumber: 6, + arrived: true, + arrival: time.Time{}, + ecn: 0, + }, + }, + expectedLatestArrival: time.Time{}.Add(time.Second), + }, + } + + for i, tc := range cases { + t.Run(fmt.Sprintf("%v", i), func(t *testing.T) { + ela, res := convertMetricBlock(tc.reference, tc.seqNrOffset, tc.blocks) + assert.Equal(t, tc.expected, res) + assert.Equal(t, tc.expectedLatestArrival, ela) + }) + } +} diff --git a/pkg/rtpfb/history.go b/pkg/rtpfb/history.go new file mode 100644 index 00000000..9ba694ae --- /dev/null +++ b/pkg/rtpfb/history.go @@ -0,0 +1,165 @@ +// SPDX-FileCopyrightText: 2025 The Pion community +// SPDX-License-Identifier: MIT + +package rtpfb + +import ( + "time" + + "github.com/pion/rtcp" +) + +type ssrcSequenceNumber struct { + ssrc uint32 + sequenceNumber uint16 +} + +// history keeps a global sequence number for all outgoing packets, called +// counter, to avoid confusion with transport wide sequence numbers from TWCC. +// Packets will be mapped to the counter either by their TWCC sequence number or +// by their combination of RTP sequence number and SSRC. When feedback arrives, +// calls to onFeedback will update the status of each packet included in the +// report. buildReport can be used to create a new report including all packets +// from nextReport to highestAcked. +type history struct { + counter uint64 + twccToCounter map[uint16]uint64 + ssrcSeqNrToCounter map[ssrcSequenceNumber]uint64 + + packets map[uint64]*PacketReport + highestAcked uint64 + nextReport uint64 + + cleanUntil uint64 +} + +func newHistory() *history { + return &history{ + counter: 0, + twccToCounter: map[uint16]uint64{}, + ssrcSeqNrToCounter: map[ssrcSequenceNumber]uint64{}, + packets: make(map[uint64]*PacketReport), + highestAcked: 0, + nextReport: 0, + } +} + +func (h *history) addOutgoing( + ssrc uint32, + rtpSequenceNumber uint16, + isTWCC bool, + twccSequenceNumber uint16, + size int, + departure time.Time, +) { + count := h.counter + h.counter++ + + if isTWCC { + h.twccToCounter[twccSequenceNumber] = count + } else { + h.ssrcSeqNrToCounter[ssrcSequenceNumber{ + ssrc: ssrc, + sequenceNumber: rtpSequenceNumber, + }] = count + } + + h.packets[count] = &PacketReport{ + SSRC: ssrc, + SequenceNumber: count, + RTPSequenceNumber: rtpSequenceNumber, + TWCCSequenceNumber: twccSequenceNumber, + Size: size, + Departure: departure, + Arrived: false, + Arrival: time.Time{}, + ECN: rtcp.ECNNonECT, + } +} + +func (h *history) onFeedback(ts time.Time, counter uint64, ack acknowledgement) (time.Duration, bool) { + p, ok := h.packets[counter] + if !ok { + return 0, false + } + p.Arrived = ack.arrived + if p.Arrived { + h.highestAcked = p.SequenceNumber + } + p.Arrival = ack.arrival + p.ECN = ack.ecn + + return ts.Sub(p.Departure), true +} + +func (h *history) onTWCCFeedback(ts time.Time, ack acknowledgement) (time.Duration, bool) { + counter, ok := h.twccToCounter[ack.sequenceNumber] + if !ok { + // ignore ack for unknown packet + return 0, false + } + + return h.onFeedback(ts, counter, ack) +} + +func (h *history) onCCFBFeedback(ts time.Time, ssrc uint32, ack acknowledgement) (time.Duration, bool) { + counter, ok := h.ssrcSeqNrToCounter[ssrcSequenceNumber{ + ssrc: ssrc, + sequenceNumber: ack.sequenceNumber, + }] + if !ok { + // ignore ack for unknown packet + return 0, false + } + + return h.onFeedback(ts, counter, ack) +} + +// buildReport builds a report containing all packets up to the highest +// acknowledged packet that were not included in a previous report. +// TODO: Implement adaptive re-order window. Packets may arrive out of order. In +// that case, they will be reported as lost. Instead of reporting them lost, we +// could wait for a short time. In some cases, reordered packets will then be +// reported as arrived in the next report. +// +//nolint:godox +func (h *history) buildReport() []PacketReport { + if h.nextReport > h.highestAcked { + return nil + } + res := make([]PacketReport, 0, h.highestAcked-h.nextReport+1) + for i := h.nextReport; i <= h.highestAcked; i++ { + packet, ok := h.packets[i] + if !ok { + // packet dropped from history? + continue + } + res = append(res, *packet) + h.delete(packet) + if packet.SequenceNumber >= h.nextReport { + h.nextReport = packet.SequenceNumber + 1 + } + } + h.cleanBefore(h.nextReport) + + return res +} + +func (h *history) delete(p *PacketReport) { + if p.IsTWCC { + delete(h.twccToCounter, p.TWCCSequenceNumber) + } + delete(h.ssrcSeqNrToCounter, ssrcSequenceNumber{ + ssrc: p.SSRC, + sequenceNumber: p.RTPSequenceNumber, + }) +} + +func (h *history) cleanBefore(counter uint64) { + for i := h.cleanUntil; i < counter; i++ { + if p, ok := h.packets[i]; ok { + h.delete(p) + } + } + h.cleanUntil = counter - 1 +} diff --git a/pkg/rtpfb/history_test.go b/pkg/rtpfb/history_test.go new file mode 100644 index 00000000..0a272ab4 --- /dev/null +++ b/pkg/rtpfb/history_test.go @@ -0,0 +1,130 @@ +// SPDX-FileCopyrightText: 2025 The Pion community +// SPDX-License-Identifier: MIT + +package rtpfb + +import ( + "fmt" + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestHistory(t *testing.T) { + t.Run("test_ccfb", func(t *testing.T) { + cases := []struct { + packets []uint16 // RTP sequence number + feedbackFirst []acknowledgement + feedbackSecond []acknowledgement + expectedFirst []PacketReport + expectedSecond []PacketReport + }{ + { + packets: []uint16{0, 1, 2, 3}, + feedbackFirst: []acknowledgement{ + {sequenceNumber: 0, arrived: true}, + {sequenceNumber: 1, arrived: true}, + {sequenceNumber: 2, arrived: true}, + }, + expectedFirst: []PacketReport{ + {SSRC: 1, SequenceNumber: 0, RTPSequenceNumber: 0, Arrived: true}, + {SSRC: 1, SequenceNumber: 1, RTPSequenceNumber: 1, Arrived: true}, + {SSRC: 1, SequenceNumber: 2, RTPSequenceNumber: 2, Arrived: true}, + }, + }, + { + packets: []uint16{5, 6, 7, 8, 9}, + feedbackFirst: []acknowledgement{ + {sequenceNumber: 5, arrived: true}, + {sequenceNumber: 6, arrived: false}, + {sequenceNumber: 7, arrived: true}, + }, + expectedFirst: []PacketReport{ + {SSRC: 1, SequenceNumber: 0, RTPSequenceNumber: 5, Arrived: true}, + {SSRC: 1, SequenceNumber: 1, RTPSequenceNumber: 6, Arrived: false}, + {SSRC: 1, SequenceNumber: 2, RTPSequenceNumber: 7, Arrived: true}, + }, + }, + { + packets: []uint16{1, 2, 3, 4, 5}, + feedbackFirst: []acknowledgement{ + {sequenceNumber: 1, arrived: true}, + {sequenceNumber: 2, arrived: true}, + }, + feedbackSecond: []acknowledgement{ + {sequenceNumber: 3, arrived: true}, + {sequenceNumber: 4, arrived: true}, + {sequenceNumber: 5, arrived: true}, + }, + expectedFirst: []PacketReport{ + {SSRC: 1, SequenceNumber: 0, RTPSequenceNumber: 1, Arrived: true}, + {SSRC: 1, SequenceNumber: 1, RTPSequenceNumber: 2, Arrived: true}, + }, + expectedSecond: []PacketReport{ + {SSRC: 1, SequenceNumber: 2, RTPSequenceNumber: 3, Arrived: true}, + {SSRC: 1, SequenceNumber: 3, RTPSequenceNumber: 4, Arrived: true}, + {SSRC: 1, SequenceNumber: 4, RTPSequenceNumber: 5, Arrived: true}, + }, + }, + { + packets: []uint16{1, 2, 3, 4, 5}, + feedbackFirst: []acknowledgement{ + {sequenceNumber: 1, arrived: true}, + {sequenceNumber: 2, arrived: true}, + {sequenceNumber: 3, arrived: true}, + }, + feedbackSecond: []acknowledgement{ + {sequenceNumber: 1, arrived: true}, + {sequenceNumber: 2, arrived: true}, + {sequenceNumber: 3, arrived: true}, + {sequenceNumber: 4, arrived: true}, + }, + expectedFirst: []PacketReport{ + {SSRC: 1, SequenceNumber: 0, RTPSequenceNumber: 1, Arrived: true}, + {SSRC: 1, SequenceNumber: 1, RTPSequenceNumber: 2, Arrived: true}, + {SSRC: 1, SequenceNumber: 2, RTPSequenceNumber: 3, Arrived: true}, + }, + expectedSecond: []PacketReport{ + {SSRC: 1, SequenceNumber: 3, RTPSequenceNumber: 4, Arrived: true}, + }, + }, + { + packets: []uint16{65534, 65535, 0, 1}, + feedbackFirst: []acknowledgement{ + {sequenceNumber: 65534, arrived: true}, + {sequenceNumber: 65535, arrived: true}, + {sequenceNumber: 0, arrived: true}, + {sequenceNumber: 1, arrived: true}, + }, + feedbackSecond: []acknowledgement{}, + expectedFirst: []PacketReport{ + {SSRC: 1, SequenceNumber: 0, RTPSequenceNumber: 65534, Arrived: true}, + {SSRC: 1, SequenceNumber: 1, RTPSequenceNumber: 65535, Arrived: true}, + {SSRC: 1, SequenceNumber: 2, RTPSequenceNumber: 0, Arrived: true}, + {SSRC: 1, SequenceNumber: 3, RTPSequenceNumber: 1, Arrived: true}, + }, + expectedSecond: nil, + }, + } + for i, tc := range cases { + t.Run(fmt.Sprintf("%v", i), func(t *testing.T) { + history := newHistory() + for _, p := range tc.packets { + history.addOutgoing(1, p, false, 0, 0, time.Time{}) + } + for _, f := range tc.feedbackFirst { + history.onCCFBFeedback(time.Time{}, 1, f) + } + reports := history.buildReport() + assert.Equal(t, tc.expectedFirst, reports) + + for _, f := range tc.feedbackSecond { + history.onCCFBFeedback(time.Time{}, 1, f) + } + reports = history.buildReport() + assert.Equal(t, tc.expectedSecond, reports) + }) + } + }) +} diff --git a/pkg/rtpfb/interceptor.go b/pkg/rtpfb/interceptor.go new file mode 100644 index 00000000..b5708b6f --- /dev/null +++ b/pkg/rtpfb/interceptor.go @@ -0,0 +1,250 @@ +// SPDX-FileCopyrightText: 2025 The Pion community +// SPDX-License-Identifier: MIT + +// Package rtpfb implements feedback aggregation for CCFB and TWCC packets. +package rtpfb + +import ( + "math" + "sync" + "time" + + "github.com/pion/interceptor" + "github.com/pion/logging" + "github.com/pion/rtcp" + "github.com/pion/rtp" +) + +const transportCCURI = "http://www.ietf.org/id/draft-holmer-rmcat-transport-wide-cc-extensions-01" + +type ccfbAttributesKeyType uint32 + +// CCFBAttributesKey is the key which can be used to retrieve the Report objects +// from the interceptor.Attributes. +const CCFBAttributesKey ccfbAttributesKeyType = iota + +type packetLog interface { + addOutgoing( + ssrc uint32, + rtpSequenceNumber uint16, + isTWCC bool, + twccSequenceNumber uint16, + size int, + departure time.Time, + ) + onTWCCFeedback(ts time.Time, ack acknowledgement) (time.Duration, bool) + onCCFBFeedback(ts time.Time, ssrc uint32, ack acknowledgement) (time.Duration, bool) + buildReport() []PacketReport +} + +// Option can be used to set initial options on CCFB interceptors. +type Option func(*Interceptor) error + +func timeFactory(f func() time.Time) Option { + return func(i *Interceptor) error { + i.timestamp = f + + return nil + } +} + +func setHistory(pl packetLog) Option { + return func(i *Interceptor) error { + i.history = pl + + return nil + } +} + +// InterceptorFactory is a factory for CCFB interceptors. +type InterceptorFactory struct { + opts []Option +} + +// NewInterceptor returns a new CCFB InterceptorFactory. +func NewInterceptor(opts ...Option) (*InterceptorFactory, error) { + return &InterceptorFactory{ + opts: opts, + }, nil +} + +// NewInterceptor returns a new ccfb.Interceptor. +func (f *InterceptorFactory) NewInterceptor(_ string) (interceptor.Interceptor, error) { + in := &Interceptor{ + NoOp: interceptor.NoOp{}, + lock: sync.RWMutex{}, + log: logging.NewDefaultLoggerFactory().NewLogger("ccfb_interceptor"), + timestamp: time.Now, + history: newHistory(), + } + for _, opt := range f.opts { + if err := opt(in); err != nil { + return nil, err + } + } + + return in, nil +} + +// Interceptor implements a congestion control feedback receiver. It keeps track +// of outgoing packets and reads incoming feedback reports (CCFB or TWCC). For +// each incoming feedback report, it will add an entry to the interceptor +// attributes, which can be read from the `RTCPReader` +// (`webrtc.RTPSender.Read`). For each acknowledgement included in the feedback +// report, a PacketReport will be added to the ccfb.Report. +type Interceptor struct { + interceptor.NoOp + lock sync.RWMutex + log logging.LeveledLogger + timestamp func() time.Time + + history packetLog +} + +func (i *Interceptor) bindTWCCStream(twccHdrExtID uint8, writer interceptor.RTPWriter) interceptor.RTPWriter { + return interceptor.RTPWriterFunc(func( + header *rtp.Header, + payload []byte, + attributes interceptor.Attributes, + ) (int, error) { + ts := i.timestamp() + + i.lock.Lock() + defer i.lock.Unlock() + + var twccHdrExt rtp.TransportCCExtension + if err := twccHdrExt.Unmarshal(header.GetExtension(twccHdrExtID)); err != nil { + i.log.Warnf( + "CCFB configured for TWCC, but failed to get TWCC header extension from outgoing packet."+ + "Falling back to saving history for CCFB feedback reports. err: %v", + err, + ) + i.history.addOutgoing(header.SSRC, header.SequenceNumber, false, 0, header.MarshalSize()+len(payload), ts) + + return writer.Write(header, payload, attributes) + } + + i.history.addOutgoing( + header.SSRC, + header.SequenceNumber, + true, + twccHdrExt.TransportSequence, + header.MarshalSize()+len(payload), + ts, + ) + + return writer.Write(header, payload, attributes) + }) +} + +func (i *Interceptor) bindCCFBStream(writer interceptor.RTPWriter) interceptor.RTPWriter { + return interceptor.RTPWriterFunc(func( + header *rtp.Header, + payload []byte, + attributes interceptor.Attributes, + ) (int, error) { + ts := i.timestamp() + + i.lock.Lock() + defer i.lock.Unlock() + + i.history.addOutgoing( + header.SSRC, + header.SequenceNumber, + false, + 0, + header.MarshalSize()+len(payload), + ts, + ) + + return writer.Write(header, payload, attributes) + }) +} + +// BindLocalStream implements interceptor.Interceptor. +func (i *Interceptor) BindLocalStream( + info *interceptor.StreamInfo, + writer interceptor.RTPWriter, +) interceptor.RTPWriter { + var twccHdrExtID uint8 + var useTWCC bool + for _, e := range info.RTPHeaderExtensions { + if e.URI == transportCCURI { + twccHdrExtID = uint8(e.ID) // nolint:gosec + useTWCC = true + + break + } + } + if useTWCC { + return i.bindTWCCStream(twccHdrExtID, writer) + } + + return i.bindCCFBStream(writer) +} + +// BindRTCPReader implements interceptor.Interceptor. +func (i *Interceptor) BindRTCPReader(reader interceptor.RTCPReader) interceptor.RTCPReader { + return interceptor.RTCPReaderFunc(func(b []byte, a interceptor.Attributes) (int, interceptor.Attributes, error) { + n, attr, err := reader.Read(b, a) + if err != nil { + return n, attr, err + } + ts := i.timestamp() + + if attr == nil { + attr = make(interceptor.Attributes) + } + + pkts, err := attr.GetRTCPPackets(b[:n]) + if err != nil { + return n, attr, err + } + rtt, prs := i.processFeedback(ts, pkts) + + if len(prs) > 0 { + report := Report{ + Arrival: ts, + RTT: rtt, + PacketReports: prs, + } + attr.Set(CCFBAttributesKey, report) + } + + return n, attr, err + }) +} + +//nolint:cyclop +func (i *Interceptor) processFeedback(ts time.Time, pkts []rtcp.Packet) (time.Duration, []PacketReport) { + i.lock.Lock() + defer i.lock.Unlock() + + shortestRTT := time.Duration(math.MaxInt64) + var ackDelay time.Duration + + for _, pkt := range pkts { + switch fb := pkt.(type) { + case *rtcp.CCFeedbackReport: + var acksPerSSRC map[uint32][]acknowledgement + ackDelay, acksPerSSRC = convertCCFB(ts, fb) + for ssrc, acks := range acksPerSSRC { + for _, ack := range acks { + rtt, ok := i.history.onCCFBFeedback(ts, ssrc, ack) + if ok && rtt < shortestRTT { + shortestRTT = rtt + } + } + } + case *rtcp.TransportLayerCC: + for _, ack := range convertTWCC(fb) { + rtt, ok := i.history.onTWCCFeedback(ts, ack) + if ok && rtt < shortestRTT { + shortestRTT = rtt + } + } + } + } + + return shortestRTT - ackDelay, i.history.buildReport() +} diff --git a/pkg/rtpfb/interceptor_test.go b/pkg/rtpfb/interceptor_test.go new file mode 100644 index 00000000..61c16355 --- /dev/null +++ b/pkg/rtpfb/interceptor_test.go @@ -0,0 +1,218 @@ +// SPDX-FileCopyrightText: 2025 The Pion community +// SPDX-License-Identifier: MIT + +package rtpfb + +import ( + "fmt" + "testing" + "time" + + "github.com/pion/interceptor" + "github.com/pion/interceptor/internal/ntp" + "github.com/pion/interceptor/internal/test" + "github.com/pion/rtcp" + "github.com/pion/rtp" + "github.com/stretchr/testify/assert" +) + +type ackListEntry struct { + ts time.Time + ssrc uint32 + ack acknowledgement +} + +type mockHistory struct { + log []PacketReport + acks []ackListEntry +} + +// addOutgoing implements packetLog. +func (m *mockHistory) addOutgoing( + ssrc uint32, + rtpSequenceNumber uint16, + isTWCC bool, + twccSequenceNumber uint16, + size int, + departure time.Time, +) { + m.log = append(m.log, PacketReport{ + SSRC: ssrc, + RTPSequenceNumber: rtpSequenceNumber, + IsTWCC: isTWCC, + TWCCSequenceNumber: twccSequenceNumber, + Size: size, + Departure: departure, + }) +} + +// buildReport implements packetLog. +func (m *mockHistory) buildReport() []PacketReport { + return nil +} + +// onCCFBFeedback implements packetLog. +func (m *mockHistory) onCCFBFeedback(ts time.Time, ssrc uint32, ack acknowledgement) (time.Duration, bool) { + m.acks = append(m.acks, ackListEntry{ + ts: ts, + ssrc: ssrc, + ack: ack, + }) + + return 0, true +} + +// onTWCCFeedback implements packetLog. +func (m *mockHistory) onTWCCFeedback(ts time.Time, ack acknowledgement) (time.Duration, bool) { + m.acks = append(m.acks, ackListEntry{ + ts: ts, + ssrc: 0, + ack: ack, + }) + + return 0, true +} + +func TestInterceptor(t *testing.T) { + mockTimeStamp := time.Time{}.Add(120 * time.Second) + t.Run("calls_add_outgoing", func(t *testing.T) { + cases := []struct { + twcc bool + packets []uint16 + expected []PacketReport + }{ + { + twcc: false, + packets: []uint16{}, + expected: []PacketReport{}, + }, + { + twcc: false, + packets: []uint16{7, 8, 9}, + expected: []PacketReport{ + {SequenceNumber: 0, RTPSequenceNumber: 7, Size: 12, Departure: mockTimeStamp}, + {SequenceNumber: 0, RTPSequenceNumber: 8, Size: 12, Departure: mockTimeStamp}, + {SequenceNumber: 0, RTPSequenceNumber: 9, Size: 12, Departure: mockTimeStamp}, + }, + }, + { + twcc: true, + packets: []uint16{}, + expected: []PacketReport{}, + }, + { + twcc: true, + packets: []uint16{7, 8, 9}, + expected: []PacketReport{ + {SequenceNumber: 0, RTPSequenceNumber: 7, IsTWCC: true, TWCCSequenceNumber: 7, Size: 20, Departure: mockTimeStamp}, + {SequenceNumber: 0, RTPSequenceNumber: 8, IsTWCC: true, TWCCSequenceNumber: 8, Size: 20, Departure: mockTimeStamp}, + {SequenceNumber: 0, RTPSequenceNumber: 9, IsTWCC: true, TWCCSequenceNumber: 9, Size: 20, Departure: mockTimeStamp}, + }, + }, + } + + for i, tc := range cases { + t.Run(fmt.Sprintf("%v", i), func(t *testing.T) { + mh := &mockHistory{ + log: []PacketReport{}, + } + mt := func() time.Time { + return mockTimeStamp + } + f, err := NewInterceptor(timeFactory(mt), setHistory(mh)) + assert.NoError(t, err) + i, err := f.NewInterceptor("") + assert.NoError(t, err) + + info := &interceptor.StreamInfo{} + if tc.twcc { + info.RTPHeaderExtensions = append(info.RTPHeaderExtensions, interceptor.RTPHeaderExtension{ + URI: transportCCURI, + ID: 2, + }) + } + stream := test.NewMockStream(info, i) + + for _, pkt := range tc.packets { + packet := &rtp.Packet{Header: rtp.Header{SequenceNumber: pkt}} + if tc.twcc { + ext := rtp.TransportCCExtension{ + TransportSequence: pkt, + } + var buf []byte + buf, err = ext.Marshal() + assert.NoError(t, err) + err = packet.SetExtension(2, buf) + assert.NoError(t, err) + } + err = stream.WriteRTP(packet) + assert.NoError(t, err) + } + + assert.Equal(t, tc.expected, mh.log) + }) + } + }) + + t.Run("calls_on_feedback", func(t *testing.T) { + cases := []struct { + feedback rtcp.Packet + expected []ackListEntry + }{ + { + feedback: &rtcp.CCFeedbackReport{ + SenderSSRC: 0, + ReportBlocks: []rtcp.CCFeedbackReportBlock{ + { + MediaSSRC: 0, + BeginSequence: 17, + MetricBlocks: []rtcp.CCFeedbackMetricBlock{ + { + Received: true, + ECN: 0, + ArrivalTimeOffset: 0, + }, + }, + }, + }, + ReportTimestamp: 0, + }, + expected: []ackListEntry{ + { + ts: mockTimeStamp, + ssrc: 0, + ack: acknowledgement{ + sequenceNumber: 17, + arrived: true, + arrival: ntp.ToTime32(0, mockTimeStamp), + ecn: 0, + }, + }, + }, + }, + } + for i, tc := range cases { + t.Run(fmt.Sprintf("%v", i), func(t *testing.T) { + mh := &mockHistory{ + log: []PacketReport{}, + acks: []ackListEntry{}, + } + mt := func() time.Time { + return mockTimeStamp + } + f, err := NewInterceptor(timeFactory(mt), setHistory(mh)) + assert.NoError(t, err) + i, err := f.NewInterceptor("") + assert.NoError(t, err) + + info := &interceptor.StreamInfo{} + stream := test.NewMockStream(info, i) + + stream.ReceiveRTCP([]rtcp.Packet{tc.feedback}) + <-stream.ReadRTCP() + + assert.Equal(t, tc.expected, mh.acks) + }) + } + }) +} diff --git a/pkg/rtpfb/report.go b/pkg/rtpfb/report.go new file mode 100644 index 00000000..03b49c2d --- /dev/null +++ b/pkg/rtpfb/report.go @@ -0,0 +1,62 @@ +// SPDX-FileCopyrightText: 2025 The Pion community +// SPDX-License-Identifier: MIT + +package rtpfb + +import ( + "time" + + "github.com/pion/rtcp" +) + +type PacketReport struct { + // SSRC of the stream the packet was sent on. + SSRC uint32 + + // Sequence number of the packet generated by the rtpfb.Interceptor. The + // sequence number increases by 1 for every outgoing packet independent of + // the stream it is sent on. + SequenceNumber uint64 + + // Sequence number from the RTP header. + RTPSequenceNumber uint16 + + // IsTWCC is true if the packet was tracked by the + // transport-wide-congestion-control header extension instead of the + // combination of SSRC and RTPSequenceNumber. + IsTWCC bool + + // Sequence number from the transport-wide-congestion-control header + // extension. + TWCCSequenceNumber uint16 + + // Size is the size of the packet in bytes. + Size int + + // Arrived indicates if the packet arrived at the receiver. False does not + // necessarily mean the packet was lost, it might still be in transit. + Arrived bool + + // Departure is the departure time of the packet taken at the sender. It + // should be the time measured at the latest possible moment before sending + // the packet. + Departure time.Time + + // Arrival is the arrival time of the packet at the receiver. Arrival and + // Departure do not require synchronized clocks and can therefore not + // directly be compared. + Arrival time.Time + + // ECN marking of the packet when it arrived. + ECN rtcp.ECN +} + +// A Report contains the Arrival time of a CCFB or TWCC packet, the estimated +// RTT based on the feedback packet and a list of PacketReport for all +// acknowledged packets that were still in the history and not yet included in +// an earlier Report. +type Report struct { + Arrival time.Time + RTT time.Duration + PacketReports []PacketReport +} diff --git a/pkg/rtpfb/twcc_receiver.go b/pkg/rtpfb/twcc_receiver.go new file mode 100644 index 00000000..07c4e03a --- /dev/null +++ b/pkg/rtpfb/twcc_receiver.go @@ -0,0 +1,91 @@ +// SPDX-FileCopyrightText: 2025 The Pion community +// SPDX-License-Identifier: MIT + +package rtpfb + +import ( + "time" + + "github.com/pion/rtcp" +) + +// nolint +func convertTWCC(feedback *rtcp.TransportLayerCC) []acknowledgement { + if feedback == nil { + return nil + } + var acks []acknowledgement + + nextTimestamp := time.Time{}.Add(time.Duration(feedback.ReferenceTime) * 64 * time.Millisecond) + recvDeltaIndex := 0 + + offset := 0 + for _, pc := range feedback.PacketChunks { + switch chunk := pc.(type) { + case *rtcp.RunLengthChunk: + for i := uint16(0); i < chunk.RunLength; i++ { + seqNr := feedback.BaseSequenceNumber + uint16(offset) // nolint:gosec + offset++ + switch chunk.PacketStatusSymbol { + case rtcp.TypeTCCPacketNotReceived: + acks = append(acks, acknowledgement{ + sequenceNumber: seqNr, + arrived: false, + arrival: time.Time{}, + ecn: 0, + }) + case rtcp.TypeTCCPacketReceivedSmallDelta, rtcp.TypeTCCPacketReceivedLargeDelta: + delta := feedback.RecvDeltas[recvDeltaIndex] + nextTimestamp = nextTimestamp.Add(time.Duration(delta.Delta) * time.Microsecond) + recvDeltaIndex++ + acks = append(acks, acknowledgement{ + sequenceNumber: seqNr, + arrived: true, + arrival: nextTimestamp, + ecn: 0, + }) + case rtcp.TypeTCCPacketReceivedWithoutDelta: + acks = append(acks, acknowledgement{ + sequenceNumber: seqNr, + arrived: true, + arrival: time.Time{}, + ecn: 0, + }) + } + } + case *rtcp.StatusVectorChunk: + for _, s := range chunk.SymbolList { + seqNr := feedback.BaseSequenceNumber + uint16(offset) // nolint:gosec + offset++ + switch s { + case rtcp.TypeTCCPacketNotReceived: + acks = append(acks, acknowledgement{ + sequenceNumber: seqNr, + arrived: false, + arrival: time.Time{}, + ecn: 0, + }) + case rtcp.TypeTCCPacketReceivedSmallDelta, rtcp.TypeTCCPacketReceivedLargeDelta: + delta := feedback.RecvDeltas[recvDeltaIndex] + nextTimestamp = nextTimestamp.Add(time.Duration(delta.Delta) * time.Microsecond) + recvDeltaIndex++ + acks = append(acks, acknowledgement{ + sequenceNumber: seqNr, + arrived: true, + arrival: nextTimestamp, + ecn: 0, + }) + case rtcp.TypeTCCPacketReceivedWithoutDelta: + acks = append(acks, acknowledgement{ + sequenceNumber: seqNr, + arrived: true, + arrival: time.Time{}, + ecn: 0, + }) + } + } + } + } + + return acks +} diff --git a/pkg/rtpfb/twcc_receiver_test.go b/pkg/rtpfb/twcc_receiver_test.go new file mode 100644 index 00000000..4534f7b0 --- /dev/null +++ b/pkg/rtpfb/twcc_receiver_test.go @@ -0,0 +1,120 @@ +// SPDX-FileCopyrightText: 2025 The Pion community +// SPDX-License-Identifier: MIT + +package rtpfb + +import ( + "fmt" + "testing" + "time" + + "github.com/pion/rtcp" + "github.com/stretchr/testify/assert" +) + +func TestConvertTWCC(t *testing.T) { + // timeZero := time.Now() + cases := []struct { + feedback *rtcp.TransportLayerCC + expect []acknowledgement + }{ + {}, + { + // ts: timeZero.Add(2 * time.Second), + feedback: &rtcp.TransportLayerCC{ + SenderSSRC: 1, + MediaSSRC: 2, + BaseSequenceNumber: 178, + PacketStatusCount: 0, + ReferenceTime: 3, + FbPktCount: 0, + PacketChunks: []rtcp.PacketStatusChunk{}, + RecvDeltas: []*rtcp.RecvDelta{}, + }, + expect: nil, + }, + { + // ts: timeZero.Add(2 * time.Second), + feedback: &rtcp.TransportLayerCC{ + SenderSSRC: 1, + MediaSSRC: 2, + BaseSequenceNumber: 178, + PacketStatusCount: 18, + ReferenceTime: 3, + FbPktCount: 0, + PacketChunks: []rtcp.PacketStatusChunk{ + &rtcp.RunLengthChunk{ + PacketStatusSymbol: rtcp.TypeTCCPacketReceivedSmallDelta, + RunLength: 3, + }, + &rtcp.StatusVectorChunk{ + SymbolSize: rtcp.TypeTCCSymbolSizeOneBit, + SymbolList: []uint16{ + rtcp.TypeTCCPacketReceivedSmallDelta, + rtcp.TypeTCCPacketReceivedSmallDelta, + rtcp.TypeTCCPacketReceivedSmallDelta, + rtcp.TypeTCCPacketNotReceived, + rtcp.TypeTCCPacketNotReceived, + rtcp.TypeTCCPacketNotReceived, + rtcp.TypeTCCPacketNotReceived, + rtcp.TypeTCCPacketNotReceived, + }, + }, + &rtcp.StatusVectorChunk{ + SymbolSize: rtcp.TypeTCCSymbolSizeTwoBit, + SymbolList: []uint16{ + rtcp.TypeTCCPacketReceivedLargeDelta, + rtcp.TypeTCCPacketReceivedLargeDelta, + rtcp.TypeTCCPacketNotReceived, + rtcp.TypeTCCPacketNotReceived, + rtcp.TypeTCCPacketNotReceived, + rtcp.TypeTCCPacketNotReceived, + rtcp.TypeTCCPacketNotReceived, + }, + }, + }, + RecvDeltas: []*rtcp.RecvDelta{ + {Type: rtcp.TypeTCCPacketReceivedSmallDelta, Delta: 1000}, + {Type: rtcp.TypeTCCPacketReceivedSmallDelta, Delta: 1000}, + {Type: rtcp.TypeTCCPacketReceivedSmallDelta, Delta: 1000}, + {Type: rtcp.TypeTCCPacketReceivedSmallDelta, Delta: 1000}, + {Type: rtcp.TypeTCCPacketReceivedSmallDelta, Delta: 1000}, + {Type: rtcp.TypeTCCPacketReceivedSmallDelta, Delta: 1000}, + {Type: rtcp.TypeTCCPacketReceivedLargeDelta, Delta: 1000}, + {Type: rtcp.TypeTCCPacketReceivedLargeDelta, Delta: 1000}, + }, + }, + expect: []acknowledgement{ + // first run length chunk + {sequenceNumber: 178, arrived: true, arrival: time.Time{}.Add(3*64*time.Millisecond + 1*time.Millisecond), ecn: 0}, + {sequenceNumber: 179, arrived: true, arrival: time.Time{}.Add(3*64*time.Millisecond + 2*time.Millisecond), ecn: 0}, + {sequenceNumber: 180, arrived: true, arrival: time.Time{}.Add(3*64*time.Millisecond + 3*time.Millisecond), ecn: 0}, + + // first status vector chunk + {sequenceNumber: 181, arrived: true, arrival: time.Time{}.Add(3*64*time.Millisecond + 4*time.Millisecond), ecn: 0}, + {sequenceNumber: 182, arrived: true, arrival: time.Time{}.Add(3*64*time.Millisecond + 5*time.Millisecond), ecn: 0}, + {sequenceNumber: 183, arrived: true, arrival: time.Time{}.Add(3*64*time.Millisecond + 6*time.Millisecond), ecn: 0}, + {sequenceNumber: 184, arrived: false, arrival: time.Time{}, ecn: 0}, + {sequenceNumber: 185, arrived: false, arrival: time.Time{}, ecn: 0}, + {sequenceNumber: 186, arrived: false, arrival: time.Time{}, ecn: 0}, + {sequenceNumber: 187, arrived: false, arrival: time.Time{}, ecn: 0}, + {sequenceNumber: 188, arrived: false, arrival: time.Time{}, ecn: 0}, + + // second status vector chunk + {sequenceNumber: 189, arrived: true, arrival: time.Time{}.Add(3*64*time.Millisecond + 7*time.Millisecond), ecn: 0}, + {sequenceNumber: 190, arrived: true, arrival: time.Time{}.Add(3*64*time.Millisecond + 8*time.Millisecond), ecn: 0}, + {sequenceNumber: 191, arrived: false, arrival: time.Time{}, ecn: 0}, + {sequenceNumber: 192, arrived: false, arrival: time.Time{}, ecn: 0}, + {sequenceNumber: 193, arrived: false, arrival: time.Time{}, ecn: 0}, + {sequenceNumber: 194, arrived: false, arrival: time.Time{}, ecn: 0}, + {sequenceNumber: 195, arrived: false, arrival: time.Time{}, ecn: 0}, + }, + }, + } + for i, tc := range cases { + t.Run(fmt.Sprintf("%v", i), func(t *testing.T) { + res := convertTWCC(tc.feedback) + assert.Equal(t, tc.expect, res) + }) + } +}