Skip to content

Commit

Permalink
Fix overuse detection and some review suggestions
Browse files Browse the repository at this point in the history
  • Loading branch information
mengelbart committed Jan 14, 2022
1 parent dce3ab5 commit 80e5a27
Show file tree
Hide file tree
Showing 14 changed files with 265 additions and 198 deletions.
5 changes: 0 additions & 5 deletions pkg/cc/interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@
package cc

import (
"errors"

"github.com/pion/interceptor"
"github.com/pion/interceptor/pkg/gcc"
"github.com/pion/logging"
Expand All @@ -13,9 +11,6 @@ import (

const transportCCURI = "http://www.ietf.org/id/draft-holmer-rmcat-transport-wide-cc-extensions-01"

// ErrUnknownSession indicates that a session ID was not assigned
var ErrUnknownSession = errors.New("unknown session ID")

// Option can be used to set initial options on GCC interceptors
type Option func(*Interceptor) error

Expand Down
94 changes: 66 additions & 28 deletions pkg/gcc/adaptive_threshold.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
package gcc

import (
"math"
"time"
)

const (
maxDeltas = 60
)

type adaptiveThresholdOption func(*adaptiveThreshold)

func setInitialThreshold(t time.Duration) adaptiveThresholdOption {
Expand All @@ -12,49 +17,82 @@ func setInitialThreshold(t time.Duration) adaptiveThresholdOption {
}
}

// adaptiveThreshold implements a threshold that continuously adapts depending on
// the current measurements/estimates. This is necessary to avoid starving GCC
// in the presence of concurrent TCP flows by allowing larger Queueing delays,
// when measurements/estimates increase. overuseCoefficientU and
// overuseCoefficientD define by how much the threshold adapts. We basically
// want the threshold to increase fast, if the measurement is outside [-thresh,
// thresh] and decrease slowly if it is within.
//
// See https://datatracker.ietf.org/doc/html/draft-ietf-rmcat-gcc-02#section-5.4
// or [Analysis and Design of the Google Congestion Control for Web Real-time
// Communication (WebRTC)](https://c3lab.poliba.it/images/6/65/Gcc-analysis.pdf)
// for a more detailed description
type adaptiveThreshold struct {
thresh time.Duration
overuseCoefficientU float64
overuseCoefficientD float64
min time.Duration
max time.Duration
thresh time.Duration
overuseCoefficientUp float64
overuseCoefficientDown float64
min time.Duration
max time.Duration
lastUpdate time.Time
numDeltas int
}

// newAdaptiveThreshold initializes a new adaptiveThreshold with default
// values taken from draft-ietf-rmcat-gcc-02
func newAdaptiveThreshold(opts ...adaptiveThresholdOption) *adaptiveThreshold {
at := &adaptiveThreshold{
thresh: time.Duration(125 * float64(time.Microsecond)),
overuseCoefficientU: 0.01,
overuseCoefficientD: 0.00018,
min: 600 * time.Microsecond,
max: 600 * time.Millisecond,
thresh: time.Duration(12500 * float64(time.Microsecond)),
overuseCoefficientUp: 0.01,
overuseCoefficientDown: 0.00018,
min: 6 * time.Millisecond,
max: 600 * time.Millisecond,
lastUpdate: time.Time{},
numDeltas: 0,
}
for _, opt := range opts {
opt(at)
}
return at
}

func (a *adaptiveThreshold) compare(estimate, dt time.Duration) (usage, time.Duration) {
absEstimate := estimate
if absEstimate < 0 {
absEstimate = -absEstimate
func (a *adaptiveThreshold) compare(estimate, dt time.Duration) (usage, time.Duration, time.Duration) {
a.numDeltas++
if a.numDeltas < 2 {
return usageNormal, estimate, a.max
}
k := a.overuseCoefficientU
if absEstimate < a.thresh {
k = a.overuseCoefficientD
t := time.Duration(minInt(a.numDeltas, maxDeltas)) * estimate
use := usageNormal
if t > a.thresh {
use = usageOver
} else if t < -a.thresh {
use = usageUnder
}
if absEstimate-a.thresh <= 15*time.Millisecond {
factor := k * float64(dt.Microseconds()) / 1000.0
add := factor * float64((absEstimate - a.thresh).Microseconds()) / 1000.0
a.thresh += time.Duration(add * float64(time.Millisecond))
}
a.thresh = clampDuration(a.thresh, a.min, a.max)
thresh := a.thresh
a.update(t)
return use, t, thresh
}

if estimate > a.thresh {
return over, a.thresh
func (a *adaptiveThreshold) update(estimate time.Duration) {
now := time.Now()
if a.lastUpdate.IsZero() {
a.lastUpdate = now
}
absEstimate := time.Duration(math.Abs(float64(estimate.Microseconds()))) * time.Microsecond
if absEstimate > a.thresh+15*time.Millisecond {
a.lastUpdate = now
return
}
if estimate < -a.thresh {
return under, a.thresh
k := a.overuseCoefficientUp
if absEstimate < a.thresh {
k = a.overuseCoefficientDown
}
return normal, a.thresh
maxTimeDelta := 100 * time.Millisecond
timeDelta := time.Duration(minInt(int(now.Sub(a.lastUpdate).Milliseconds()), int(maxTimeDelta.Milliseconds()))) * time.Millisecond
d := absEstimate - a.thresh
add := k * float64(d.Milliseconds()) * float64(timeDelta.Milliseconds())
a.thresh += time.Duration(add) * 1000 * time.Microsecond
a.thresh = clampDuration(a.thresh, a.min, a.max)
a.lastUpdate = now
}
43 changes: 36 additions & 7 deletions pkg/gcc/adaptive_threshold_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,48 +23,73 @@ func TestAdaptiveThreshold(t *testing.T) {
expected: []usage{},
options: []adaptiveThresholdOption{},
},
{
name: "firstInputIsAlwaysNormal",
in: []input{{
estimate: 1 * time.Second,
delta: 0,
}},
expected: []usage{usageNormal},
options: []adaptiveThresholdOption{},
},
{
name: "singleOver",
in: []input{
{
estimate: 0,
delta: 0,
},
{
estimate: 20 * time.Millisecond,
delta: 0,
},
},
expected: []usage{over},
expected: []usage{usageNormal, usageOver},
options: []adaptiveThresholdOption{
setInitialThreshold(10 * time.Millisecond),
},
},
{
name: "singleNormal",
in: []input{
{
estimate: 0,
delta: 0,
},
{
estimate: 5 * time.Millisecond,
delta: 0,
},
},
expected: []usage{normal},
expected: []usage{usageNormal, usageNormal},
options: []adaptiveThresholdOption{
setInitialThreshold(10 * time.Millisecond),
},
},
{
name: "singleUnder",
in: []input{
{
estimate: 0,
delta: 0,
},
{
estimate: -20 * time.Millisecond,
delta: 0,
},
},
expected: []usage{under},
expected: []usage{usageNormal, usageUnder},
options: []adaptiveThresholdOption{
setInitialThreshold(10 * time.Millisecond),
},
},
{
name: "increaseThresholdOnOveruse",
in: []input{
{
estimate: 0,
delta: 0,
},
{
estimate: 25 * time.Millisecond,
delta: 30 * time.Millisecond,
Expand All @@ -74,14 +99,18 @@ func TestAdaptiveThreshold(t *testing.T) {
delta: 30 * time.Millisecond,
},
},
expected: []usage{over, normal},
expected: []usage{usageNormal, usageOver, usageNormal},
options: []adaptiveThresholdOption{
setInitialThreshold(10 * time.Millisecond),
setInitialThreshold(40 * time.Millisecond),
},
},
{
name: "overuseAfterOveruse",
in: []input{
{
estimate: 0,
delta: 0,
},
{
estimate: 20 * time.Millisecond,
delta: 30 * time.Millisecond,
Expand All @@ -91,7 +120,7 @@ func TestAdaptiveThreshold(t *testing.T) {
delta: 30 * time.Millisecond,
},
},
expected: []usage{over, over},
expected: []usage{usageNormal, usageOver, usageOver},
options: []adaptiveThresholdOption{
setInitialThreshold(10 * time.Millisecond),
},
Expand All @@ -104,7 +133,7 @@ func TestAdaptiveThreshold(t *testing.T) {
threshold := newAdaptiveThreshold(tc.options...)
usages := []usage{}
for _, in := range tc.in {
use, _ := threshold.compare(in.estimate, in.delta)
use, _, _ := threshold.compare(in.estimate, in.delta)
usages = append(usages, use)
}
assert.Equal(t, tc.expected, usages, "%v != %v", tc.expected, usages)
Expand Down
2 changes: 1 addition & 1 deletion pkg/gcc/delay_based_bwe.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func newDelayController(c delayControllerConfig) *delayController {

arrivalGroupAccumulator := newArrivalGroupAccumulator()
slopeEstimator := newSlopeEstimator(newKalman())
overuseDetector := newOveruseDetector(newAdaptiveThreshold(setInitialThreshold(12*time.Millisecond)), 10*time.Millisecond)
overuseDetector := newOveruseDetector(newAdaptiveThreshold(), 10*time.Millisecond)
rateController := newRateController(c.nowFn, c.initialBitrate, c.minBitrate, c.maxBitrate)

arrival := arrivalGroupAccumulator.run(ackPipe)
Expand Down
18 changes: 15 additions & 3 deletions pkg/gcc/loss_based_bwe.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,18 @@ import (
"github.com/pion/logging"
)

const (
// constants from
// https://datatracker.ietf.org/doc/html/draft-ietf-rmcat-gcc-02#section-6

increaseLossThreshold = 0.02
increaseTimeThreshold = 200 * time.Millisecond
increaseFactor = 1.05

decreaseLossThreshold = 0.1
decreaseTimeThreshold = 200 * time.Millisecond
)

// LossStats contains internal statistics of the loss based controller
type LossStats struct {
TargetBitrate int
Expand Down Expand Up @@ -77,11 +89,11 @@ func (e *lossBasedBandwidthEstimator) updateLossEstimate(results []Acknowledgmen
e.lock.Lock()
defer e.lock.Unlock()

if increaseLoss < 0.02 && time.Since(e.lastIncrease) > 200*time.Millisecond {
if increaseLoss < increaseLossThreshold && time.Since(e.lastIncrease) > increaseTimeThreshold {
e.log.Infof("loss controller increasing; averageLoss: %v, decreaseLoss: %v, increaseLoss: %v", e.averageLoss, decreaseLoss, increaseLoss)
e.lastIncrease = time.Now()
e.bitrate = clampInt(int(1.05*float64(e.bitrate)), e.minBitrate, e.maxBitrate)
} else if decreaseLoss > 0.1 && time.Since(e.lastDecrease) > 200*time.Millisecond {
e.bitrate = clampInt(int(increaseFactor*float64(e.bitrate)), e.minBitrate, e.maxBitrate)
} else if decreaseLoss > decreaseLossThreshold && time.Since(e.lastDecrease) > decreaseTimeThreshold {
e.log.Infof("loss controller decreasing; averageLoss: %v, decreaseLoss: %v, increaseLoss: %v", e.averageLoss, decreaseLoss, increaseLoss)
e.lastDecrease = time.Now()
e.bitrate = clampInt(int(float64(e.bitrate)*(1-0.5*decreaseLoss)), e.minBitrate, e.maxBitrate)
Expand Down
4 changes: 2 additions & 2 deletions pkg/gcc/minmax.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@ func clampInt(b, min, max int) int {
}

func clampDuration(d, min, max time.Duration) time.Duration {
if min < d && d < max {
if min <= d && d <= max {
return d
}
if d < min {
if d <= min {
return min
}
return max
Expand Down
55 changes: 33 additions & 22 deletions pkg/gcc/overuse_detector.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
)

type threshold interface {
compare(estimate time.Duration, delta time.Duration) (usage, time.Duration)
compare(estimate time.Duration, delta time.Duration) (usage, time.Duration, time.Duration)
}

type overuseDetector struct {
Expand All @@ -26,41 +26,52 @@ func (d *overuseDetector) run(in <-chan DelayStats) <-chan DelayStats {
lastEstimate := 0 * time.Millisecond
lastUpdate := time.Now()
var increasingDuration time.Duration
var increasingCounter int

for estimate := range in {
for next := range in {
now := time.Now()
delta := now.Sub(lastUpdate)
lastUpdate = now

use, currentThreshold := d.threshold.compare(estimate.Estimate, estimate.lastReceiveDelta)
thresholdUse, estimate, currentThreshold := d.threshold.compare(next.Estimate, next.lastReceiveDelta)

if use != over {
increasingDuration = 0
}

if use == over {
increasingDuration += delta
if increasingDuration < d.overuseTime ||
estimate.Estimate < lastEstimate {
use = normal
use := usageNormal
if thresholdUse == usageOver {
if increasingDuration == 0 {
increasingDuration = delta / 2
} else {
increasingDuration += delta
}
increasingCounter++
if increasingDuration > d.overuseTime && increasingCounter > 1 {
if estimate > lastEstimate {
use = usageOver
}
}
}
if thresholdUse == usageUnder {
increasingCounter = 0
increasingDuration = 0
use = usageUnder
}

if use == over {
if thresholdUse == usageNormal {
increasingDuration = 0
increasingCounter = 0
use = usageNormal
}
lastEstimate = estimate

out <- DelayStats{
Measurement: estimate.Measurement,
Estimate: estimate.Estimate,
Threshold: currentThreshold,
Usage: use,

State: 0,
TargetBitrate: 0,
RTT: 0,
Measurement: next.Measurement,
Estimate: estimate,
Threshold: currentThreshold,
lastReceiveDelta: delta,
Usage: use,
State: 0,
TargetBitrate: 0,
RTT: 0,
}
lastEstimate = estimate.Estimate
}
close(out)
}()
Expand Down
Loading

0 comments on commit 80e5a27

Please sign in to comment.