Skip to content

Commit

Permalink
Implement first version of GCC
Browse files Browse the repository at this point in the history
Implements a first version of Google Congestion Control as described in
https://datatracker.ietf.org/doc/html/draft-ietf-rmcat-gcc-02
  • Loading branch information
mengelbart authored and Sean-Der committed Jan 15, 2022
1 parent ec82d53 commit 6915d55
Show file tree
Hide file tree
Showing 32 changed files with 3,843 additions and 0 deletions.
26 changes: 26 additions & 0 deletions internal/cc/acknowledgment.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package cc

import (
"fmt"
"time"
)

// Acknowledgment holds information about a packet and if/when it has been
// sent/received.
type Acknowledgment struct {
TLCC uint16
Size int
Departure time.Time
Arrival time.Time
RTT time.Duration
}

func (a Acknowledgment) String() string {
s := "ACK:\n"
s += fmt.Sprintf("\tTLCC:\t%v\n", a.TLCC)
s += fmt.Sprintf("\tSIZE:\t%v\n", a.Size)
s += fmt.Sprintf("\tDEPARTURE:\t%v\n", int64(float64(a.Departure.UnixNano())/1e+6))
s += fmt.Sprintf("\tARRIVAL:\t%v\n", int64(float64(a.Arrival.UnixNano())/1e+6))
s += fmt.Sprintf("\tRTT:\t%v\n", a.RTT)
return s
}
2 changes: 2 additions & 0 deletions internal/cc/cc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
// Package cc implements common constructs used by Congestion Controllers
package cc
159 changes: 159 additions & 0 deletions internal/cc/feedback_adapter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
package cc

import (
"errors"
"sync"
"time"

"github.com/pion/interceptor"
"github.com/pion/rtcp"
"github.com/pion/rtp"
)

// TwccExtensionAttributesKey identifies the TWCC value in the attribute collection
// so we don't need to reparse
const TwccExtensionAttributesKey = iota

var (
errMissingTWCCExtension = errors.New("missing transport layer cc header extension")
errUnknownFeedbackFormat = errors.New("unknown feedback format")

errInvalidFeedback = errors.New("invalid feedback")
)

// FeedbackAdapter converts incoming RTCP Packets (TWCC and RFC8888) into Acknowledgments.
// Acknowledgments are the common format that Congestion Controllers in Pion understand.
type FeedbackAdapter struct {
lock sync.Mutex
history map[uint16]Acknowledgment
}

// NewFeedbackAdapter returns a new FeedbackAdapter
func NewFeedbackAdapter() *FeedbackAdapter {
return &FeedbackAdapter{
history: make(map[uint16]Acknowledgment),
}
}

// OnSent records that and when an outgoing packet was sent for later mapping to
// acknowledgments
func (f *FeedbackAdapter) OnSent(ts time.Time, header *rtp.Header, size int, attributes interceptor.Attributes) error {
hdrExtensionID := attributes.Get(TwccExtensionAttributesKey)
id, ok := hdrExtensionID.(uint8)
if !ok || hdrExtensionID == 0 {
return errMissingTWCCExtension
}
sequenceNumber := header.GetExtension(id)
var tccExt rtp.TransportCCExtension
err := tccExt.Unmarshal(sequenceNumber)
if err != nil {
return err
}

f.lock.Lock()
defer f.lock.Unlock()
f.history[tccExt.TransportSequence] = Acknowledgment{
TLCC: tccExt.TransportSequence,
Size: header.MarshalSize() + size,
Departure: ts,
Arrival: time.Time{},
RTT: 0,
}
return nil
}

// OnFeedback converts incoming RTCP packet feedback to Acknowledgments.
// Currently only TWCC is supported.
func (f *FeedbackAdapter) OnFeedback(ts time.Time, feedback rtcp.Packet) ([]Acknowledgment, error) {
f.lock.Lock()
defer f.lock.Unlock()

switch fb := feedback.(type) {
case *rtcp.TransportLayerCC:
return f.onIncomingTransportCC(ts, fb)
default:
return nil, errUnknownFeedbackFormat
}
}

func (f *FeedbackAdapter) unpackRunLengthChunk(ts time.Time, start uint16, refTime time.Time, chunk *rtcp.RunLengthChunk, deltas []*rtcp.RecvDelta) (consumedDeltas int, nextRef time.Time, acks []Acknowledgment, err error) {
result := make([]Acknowledgment, chunk.RunLength)
deltaIndex := 0

end := start + chunk.RunLength
resultIndex := 0
for i := start; i != end; i++ {
if ack, ok := f.history[i]; ok {
if chunk.PacketStatusSymbol != rtcp.TypeTCCPacketNotReceived {
if len(deltas)-1 < deltaIndex {
return deltaIndex, refTime, result, errInvalidFeedback
}
refTime = refTime.Add(time.Duration(deltas[deltaIndex].Delta) * time.Microsecond)
ack.Arrival = refTime
ack.RTT = ts.Sub(ack.Departure)
deltaIndex++
}
result[resultIndex] = ack
}
resultIndex++
}
return deltaIndex, refTime, result, nil
}

func (f *FeedbackAdapter) unpackStatusVectorChunk(ts time.Time, start uint16, refTime time.Time, chunk *rtcp.StatusVectorChunk, deltas []*rtcp.RecvDelta) (consumedDeltas int, nextRef time.Time, acks []Acknowledgment, err error) {
result := make([]Acknowledgment, len(chunk.SymbolList))
deltaIndex := 0
resultIndex := 0
for i, symbol := range chunk.SymbolList {
if ack, ok := f.history[start+uint16(i)]; ok {
if symbol != rtcp.TypeTCCPacketNotReceived {
if len(deltas)-1 < deltaIndex {
return deltaIndex, refTime, result, errInvalidFeedback
}
refTime = refTime.Add(time.Duration(deltas[deltaIndex].Delta) * time.Microsecond)
ack.Arrival = refTime
ack.RTT = ts.Sub(ack.Departure)
deltaIndex++
}
result[resultIndex] = ack
}
resultIndex++
}

return deltaIndex, refTime, result, nil
}

func (f *FeedbackAdapter) onIncomingTransportCC(ts time.Time, feedback *rtcp.TransportLayerCC) ([]Acknowledgment, error) {
result := []Acknowledgment{}

index := feedback.BaseSequenceNumber
refTime := time.Time{}.Add(time.Duration(feedback.ReferenceTime) * 64 * time.Millisecond)
recvDeltas := feedback.RecvDeltas

for _, chunk := range feedback.PacketChunks {
switch chunk := chunk.(type) {
case *rtcp.RunLengthChunk:
n, nextRefTime, acks, err := f.unpackRunLengthChunk(ts, index, refTime, chunk, recvDeltas)
if err != nil {
return nil, err
}
refTime = nextRefTime
result = append(result, acks...)
recvDeltas = recvDeltas[n:]
index = uint16(int(index) + len(acks))
case *rtcp.StatusVectorChunk:
n, nextRefTime, acks, err := f.unpackStatusVectorChunk(ts, index, refTime, chunk, recvDeltas)
if err != nil {
return nil, err
}
refTime = nextRefTime
result = append(result, acks...)
recvDeltas = recvDeltas[n:]
index = uint16(int(index) + len(acks))
default:
return nil, errInvalidFeedback
}
}

return result, nil
}
Loading

0 comments on commit 6915d55

Please sign in to comment.