Skip to content

Commit

Permalink
Revert "Defer packet ordering until building ..."
Browse files Browse the repository at this point in the history
Reverted in favor of #198

This reverts commit dd00fe3.
  • Loading branch information
kcaffrey committed Sep 28, 2023
1 parent 306e11d commit 648ceca
Show file tree
Hide file tree
Showing 2 changed files with 181 additions and 36 deletions.
61 changes: 33 additions & 28 deletions pkg/twcc/twcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ package twcc

import (
"math"
"sort"

"github.com/pion/rtcp"
)
Expand Down Expand Up @@ -45,8 +44,10 @@ func (r *Recorder) Record(mediaSSRC uint32, sequenceNumber uint16, arrivalTime i
if sequenceNumber < 0x0fff && (r.lastSequenceNumber&0xffff) > 0xf000 {
r.cycles += 1 << 16
}
pkt := pktInfo{r.cycles | uint32(sequenceNumber), arrivalTime}
r.receivedPackets = append(r.receivedPackets, pkt)
r.receivedPackets = insertSorted(r.receivedPackets, pktInfo{
sequenceNumber: r.cycles | uint32(sequenceNumber),
arrivalTime: arrivalTime,
})
r.lastSequenceNumber = sequenceNumber
}

Expand All @@ -55,41 +56,45 @@ func (r *Recorder) PacketsHeld() int {
return len(r.receivedPackets)
}

func insertSorted(list []pktInfo, element pktInfo) []pktInfo {
if len(list) == 0 {
return append(list, element)
}
for i := len(list) - 1; i >= 0; i-- {
if list[i].sequenceNumber < element.sequenceNumber {
list = append(list, pktInfo{})
copy(list[i+2:], list[i+1:])
list[i+1] = element
return list
}
if list[i].sequenceNumber == element.sequenceNumber {
list[i] = element
return list
}
}
// element.sequenceNumber is between 0 and first ever received sequenceNumber
return append([]pktInfo{element}, list...)
}

// BuildFeedbackPacket creates a new RTCP packet containing a TWCC feedback report.
func (r *Recorder) BuildFeedbackPacket() []rtcp.Packet {
// sort received packets by sequence number, with earliest arrivalTime first in cases of duplicates
canBuild := false
sort.Slice(r.receivedPackets, func(i, j int) bool {
if r.receivedPackets[i].sequenceNumber == r.receivedPackets[j].sequenceNumber {
return r.receivedPackets[i].arrivalTime < r.receivedPackets[j].arrivalTime
}
canBuild = true // need at least 2 non-duplicate packets
return r.receivedPackets[i].sequenceNumber < r.receivedPackets[j].sequenceNumber
})
if !canBuild {
if len(r.receivedPackets) < 2 {
return nil
}

feedback := newFeedback(r.senderSSRC, r.mediaSSRC, r.fbPktCnt)
r.fbPktCnt++
feedback.setBase(
uint16(r.receivedPackets[0].sequenceNumber&0xffff),
r.receivedPackets[0].arrivalTime,
)
feedback.setBase(uint16(r.receivedPackets[0].sequenceNumber&0xffff), r.receivedPackets[0].arrivalTime)

var pkts []rtcp.Packet
var prevSN uint32
for i, pkt := range r.receivedPackets {
if i == 0 || pkt.sequenceNumber != prevSN {
ok := feedback.addReceived(uint16(pkt.sequenceNumber&0xffff), pkt.arrivalTime)
if !ok {
pkts = append(pkts, feedback.getRTCP())
feedback = newFeedback(r.senderSSRC, r.mediaSSRC, r.fbPktCnt)
r.fbPktCnt++
feedback.addReceived(uint16(pkt.sequenceNumber&0xffff), pkt.arrivalTime)
}
for _, pkt := range r.receivedPackets {
ok := feedback.addReceived(uint16(pkt.sequenceNumber&0xffff), pkt.arrivalTime)
if !ok {
pkts = append(pkts, feedback.getRTCP())
feedback = newFeedback(r.senderSSRC, r.mediaSSRC, r.fbPktCnt)
r.fbPktCnt++
feedback.addReceived(uint16(pkt.sequenceNumber&0xffff), pkt.arrivalTime)
}
prevSN = pkt.sequenceNumber
}
r.receivedPackets = []pktInfo{}
pkts = append(pkts, feedback.getRTCP())
Expand Down
156 changes: 148 additions & 8 deletions pkg/twcc/twcc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package twcc

import (
"fmt"
"testing"

"github.com/pion/rtcp"
Expand Down Expand Up @@ -618,10 +619,10 @@ func TestDuplicatePackets(t *testing.T) {

arrivalTime := int64(scaleFactorReferenceTime)
addRun(t, r, []uint16{12, 13, 13, 14}, []int64{
increaseTime(&arrivalTime, rtcp.TypeTCCDeltaScaleFactor),
arrivalTime + rtcp.TypeTCCDeltaScaleFactor*256,
increaseTime(&arrivalTime, rtcp.TypeTCCDeltaScaleFactor),
increaseTime(&arrivalTime, rtcp.TypeTCCDeltaScaleFactor),
arrivalTime,
arrivalTime,
arrivalTime,
arrivalTime,
})

rtcpPackets := r.BuildFeedbackPacket()
Expand Down Expand Up @@ -649,9 +650,9 @@ func TestDuplicatePackets(t *testing.T) {
},
},
RecvDeltas: []*rtcp.RecvDelta{
{Type: rtcp.TypeTCCPacketReceivedSmallDelta, Delta: 250},
{Type: rtcp.TypeTCCPacketReceivedSmallDelta, Delta: 250},
{Type: rtcp.TypeTCCPacketReceivedSmallDelta, Delta: 250},
{Type: rtcp.TypeTCCPacketReceivedSmallDelta, Delta: 0},
{Type: rtcp.TypeTCCPacketReceivedSmallDelta, Delta: 0},
{Type: rtcp.TypeTCCPacketReceivedSmallDelta, Delta: 0},
},
}, rtcpToTwcc(t, rtcpPackets)[0])
marshalAll(t, rtcpPackets)
Expand Down Expand Up @@ -848,7 +849,146 @@ func TestReorderedPackets(t *testing.T) {
marshalAll(t, rtcpPackets)
}

func TestPacketsHeld(t *testing.T) {
func TestInsertSorted(t *testing.T) {
cases := []struct {
l []pktInfo
e pktInfo
expected []pktInfo
}{
{
l: []pktInfo{},
e: pktInfo{},
expected: []pktInfo{{
sequenceNumber: 0,
arrivalTime: 0,
}},
},
{
l: []pktInfo{
{
sequenceNumber: 0,
arrivalTime: 0,
},
{
sequenceNumber: 1,
arrivalTime: 0,
},
},
e: pktInfo{
sequenceNumber: 2,
arrivalTime: 0,
},
expected: []pktInfo{
{
sequenceNumber: 0,
arrivalTime: 0,
},
{
sequenceNumber: 1,
arrivalTime: 0,
},
{
sequenceNumber: 2,
arrivalTime: 0,
},
},
},
{
l: []pktInfo{
{
sequenceNumber: 0,
arrivalTime: 0,
},
{
sequenceNumber: 2,
arrivalTime: 0,
},
},
e: pktInfo{
sequenceNumber: 1,
arrivalTime: 0,
},
expected: []pktInfo{
{
sequenceNumber: 0,
arrivalTime: 0,
},
{
sequenceNumber: 1,
arrivalTime: 0,
},
{
sequenceNumber: 2,
arrivalTime: 0,
},
},
},
{
l: []pktInfo{
{
sequenceNumber: 0,
arrivalTime: 0,
},
{
sequenceNumber: 1,
arrivalTime: 0,
},
{
sequenceNumber: 2,
arrivalTime: 0,
},
},
e: pktInfo{
sequenceNumber: 1,
arrivalTime: 0,
},
expected: []pktInfo{
{
sequenceNumber: 0,
arrivalTime: 0,
},
{
sequenceNumber: 1,
arrivalTime: 0,
},
{
sequenceNumber: 2,
arrivalTime: 0,
},
},
},
{
l: []pktInfo{
{
sequenceNumber: 10,
arrivalTime: 0,
},
},
e: pktInfo{
sequenceNumber: 9,
arrivalTime: 0,
},
expected: []pktInfo{
{
sequenceNumber: 9,
arrivalTime: 0,
},
{
sequenceNumber: 10,
arrivalTime: 0,
},
},
},
}
for i, c := range cases {
c := c
t.Run(fmt.Sprintf("%v", i), func(t *testing.T) {
assert.Equal(t, c.expected, insertSorted(c.l, c.e))
})
}
}

func TestPacketsHheld(t *testing.T) {
r := NewRecorder(5000)
assert.Zero(t, r.PacketsHeld())

Expand Down

0 comments on commit 648ceca

Please sign in to comment.