Skip to content

Commit

Permalink
Implemented NetworkConditioner
Browse files Browse the repository at this point in the history
NetworkConditioner allows to replicate in a predictable way
network condition, such as a 3G or LTE network.
There is a list of pre-defined NetworkConditionerPreset,
or the user can provided their own.
  • Loading branch information
Antonito committed Jun 23, 2021
1 parent 8ae6e0d commit 2dfbb94
Show file tree
Hide file tree
Showing 6 changed files with 540 additions and 9 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ Check out the **[contributing wiki](https://github.com/pion/webrtc/wiki/Contribu
* [OrlandoCo](https://github.com/OrlandoCo)
* [Tarrence van As](https://github.com/tarrencev)
* [Winlin](https://github.com/ossrs/srs) - *UDP proxy to communicate with real servers*
* [Antoine Baché](https://github.com/Antonito) *Network Conditioner*

### License
MIT License - see [LICENSE](LICENSE) for full text
51 changes: 42 additions & 9 deletions vnet/net.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ var (
errNoRouterLinked = errors.New("no router linked")
errInvalidPortNumber = errors.New("invalid port number")
errUnexpectedTypeSwitchFailure = errors.New("unexpected type-switch failure")
errBindFailerFor = errors.New("bind failed for")
errBindFailedFor = errors.New("bind failed for")
errEndPortLessThanStart = errors.New("end port is less than the start")
errPortSpaceExhausted = errors.New("port space exhausted")
errVNetDisabled = errors.New("vnet is not enabled")
Expand All @@ -40,11 +40,12 @@ func newMACAddress() net.HardwareAddr {
}

type vNet struct {
interfaces []*Interface // read-only
staticIPs []net.IP // read-only
router *Router // read-only
udpConns *udpConnMap // read-only
mutex sync.RWMutex
interfaces []*Interface // read-only
staticIPs []net.IP // read-only
router *Router // read-only
udpConns *udpConnMap // read-only
networkConditioner NetworkConditioner
mutex sync.RWMutex
}

func (v *vNet) _getInterfaces() ([]*Interface, error) {
Expand Down Expand Up @@ -124,6 +125,11 @@ func (v *vNet) setRouter(r *Router) error {
}

func (v *vNet) onInboundChunk(c Chunk) {
if v.networkConditioner.handleDownLink(c) {
// Drop packet if requested.
return
}

v.mutex.Lock()
defer v.mutex.Unlock()

Expand Down Expand Up @@ -279,6 +285,11 @@ func (v *vNet) resolveUDPAddr(network, address string) (*net.UDPAddr, error) {
}

func (v *vNet) write(c Chunk) error {
if v.networkConditioner.handleUpLink(c) {
// Drop packet if requested.
return nil
}

if c.Network() == udpString {
if udp, ok := c.(*chunkUDP); ok {
if c.getDestinationIP().IsLoopback() {
Expand Down Expand Up @@ -407,7 +418,7 @@ func (v *vNet) allocateLocalAddr(ip net.IP, port int) error {
}

if len(ips) == 0 {
return fmt.Errorf("%w %s", errBindFailerFor, ip.String())
return fmt.Errorf("%w %s", errBindFailedFor, ip.String())
}

// check if all these transport addresses are not in use
Expand Down Expand Up @@ -459,9 +470,13 @@ type NetConfig struct {

// StaticIP is deprecated. Use StaticIPs.
StaticIP string

// NetworkConditioner to apply to the transport layer.
// This parameter is applied in supplement of Router-wide settings, such as jitter.
NetworkConditioner *NetworkConditioner
}

// Net represents a local network stack euivalent to a set of layers from NIC
// Net represents a local network stack equivalent to a set of layers from NIC
// up to the transport (UDP / TCP) layer.
type Net struct {
v *vNet
Expand Down Expand Up @@ -531,9 +546,13 @@ func NewNet(config *NetConfig) *Net {
udpConns: newUDPConnMap(),
}

return &Net{
n := &Net{
v: v,
}

n.SetNetworkConditioner(config.NetworkConditioner)

return n
}

// Interfaces returns a list of the system's network interfaces.
Expand Down Expand Up @@ -656,6 +675,20 @@ func (n *Net) IsVirtual() bool {
return n.v != nil
}

// SetNetworkConditioner sets the current NetworkConditioner.
// NetworkCondition are applied in supplement of Router-wide settings, such as jitter.
func (n *Net) SetNetworkConditioner(conditioner *NetworkConditioner) {
var networkConditioner *NetworkConditioner
if conditioner == nil {
networkConditioner = NewNetworkConditioner(NetworkConditionerPresetNone)
} else {
networkConditioner = conditioner
}

n.v.networkConditioner.DownLink.update(networkConditioner.DownLink)
n.v.networkConditioner.UpLink.update(networkConditioner.UpLink)
}

// Dialer is identical to net.Dialer excepts that its methods
// (Dial, DialContext) are overridden to use virtual network.
// Use vnet.CreateDialer() to create an instance of this Dialer.
Expand Down
132 changes: 132 additions & 0 deletions vnet/network_conditioner.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
package vnet

import (
"math/rand"
"sync/atomic"
"time"
"unsafe"
)

const (
// NetworkConditionPacketLossDenominator is the denominator by which the `NetworkCondition.PacketLoss` value
// will be divided.
NetworkConditionPacketLossDenominator uint32 = 1000

kiloBytesInBytes = 1000
// Value chosen arbitrarily, there may be a better one.
bandwidthComputeThreshold = 2 * time.Second
)

// NetworkConditioner hold the NetworkCondition of DownLink and UpLink channels.
type NetworkConditioner struct {
// Internals stats

// Total of downLink bytes
// Must be accessed atomically.
inBytes uint64
pendingInBytes uint64
lastInByteChunkTimestamp int64

// Total of upLink bytes
// Must be accessed atomically.
outBytes uint64
pendingOutBytes uint64
lastOutByteChunkTimestamp int64

// DownLink represents the DownLink condition.
DownLink NetworkCondition
// UpLink represents the UpLink condition.
UpLink NetworkCondition
}

func (n *NetworkConditioner) handleDownLink(c Chunk) bool {
atomic.AddUint64(&n.pendingInBytes, uint64(len(c.UserData())))

bandwidthLimiterSleep := n.bandwidthLimiter(&n.DownLink.MaxBandwidth, &n.lastInByteChunkTimestamp, &n.inBytes, &n.pendingInBytes)

// Apply constant latency if required
if sleepNeeded := time.Duration(atomic.LoadInt64((*int64)(&n.DownLink.Latency))) - bandwidthLimiterSleep; sleepNeeded > 0 {
time.Sleep(sleepNeeded)
}

return n.DownLink.shouldDropPacket()
}

func (n *NetworkConditioner) handleUpLink(c Chunk) bool {
atomic.AddUint64(&n.pendingOutBytes, uint64(len(c.UserData())))

bandwidthLimiterSleep := n.bandwidthLimiter(&n.UpLink.MaxBandwidth, &n.lastOutByteChunkTimestamp, &n.outBytes, &n.pendingOutBytes)

// Apply constant latency if required
if sleepNeeded := time.Duration(atomic.LoadInt64((*int64)(&n.UpLink.Latency))) - bandwidthLimiterSleep; sleepNeeded > 0 {
time.Sleep(sleepNeeded)
}

return n.UpLink.shouldDropPacket()
}

func (n *NetworkConditioner) bandwidthLimiter(maxBandwidth **uint32, lastTimestamp *int64, totalBytes *uint64, pendingBytes *uint64) time.Duration {
// Is safe because refers to a `MaxBandwidth` field in an initialized NetworkCondition struct.
// nolint:gosec
bandwidthLimit := (*uint32)(atomic.LoadPointer((*unsafe.Pointer)(unsafe.Pointer(maxBandwidth))))
bandwidthLimiterSleep := 0 * time.Millisecond

if bandwidthLimit != nil {
now := time.Now().UnixNano()
isFirstLoop := atomic.CompareAndSwapInt64(lastTimestamp, 0, now)

if !isFirstLoop {
duration := now - atomic.LoadInt64(lastTimestamp)
if time.Duration(duration) > bandwidthComputeThreshold {
previouslyPending := atomic.SwapUint64(pendingBytes, 0)

bandwidthKbps := (float64(previouslyPending) / float64(duration) * float64(time.Second)) / kiloBytesInBytes

atomic.AddUint64(totalBytes, previouslyPending)
atomic.StoreInt64(lastTimestamp, now)

if bandwidthKbps > float64(*bandwidthLimit) {
exceeding := bandwidthKbps - float64(*bandwidthLimit)
bandwidthLimiterSleep = time.Duration(exceeding * float64(time.Second))

time.Sleep(bandwidthLimiterSleep)
}
}
}
}

return bandwidthLimiterSleep
}

// NetworkCondition represents the network condition parameter of a data transmission channel direction.
type NetworkCondition struct {
// Latency represents the network delay.
Latency time.Duration
// MaxBandwidth is the maximum bandwidth, in Kbps.
MaxBandwidth *uint32
// PacketLoss in percentage.
// This value will be divided by NetworkConditionPacketLossDenominator.
// Any quotient > 1.00 is meaningless.
PacketLoss uint32
}

// Updates atomically each member of the current NetworkCondition with the new one.
func (c *NetworkCondition) update(newCondition NetworkCondition) {
// Each member is atomically updated, but not the whole struct.
// This isn't a problem, as it may only affect a few packets – and removes the need for locks.
// nolint:gosec
atomic.StorePointer((*unsafe.Pointer)(unsafe.Pointer(&c.MaxBandwidth)), unsafe.Pointer(newCondition.MaxBandwidth))
atomic.StoreInt64((*int64)(&c.Latency), int64(newCondition.Latency))
atomic.StoreUint32(&c.PacketLoss, newCondition.PacketLoss)
}

func (c *NetworkCondition) shouldDropPacket() bool {
packetLoss := atomic.LoadUint32(&c.PacketLoss)

// nolint:gosec
if packetLoss > 0 && uint32(rand.Int31n(int32(NetworkConditionPacketLossDenominator))) < packetLoss {
return true
}

return false
}
26 changes: 26 additions & 0 deletions vnet/network_conditioner_preset_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package vnet

import (
"testing"
)

func TestNetworkConditionerPresets(t *testing.T) {
presets := [...]NetworkConditionerPreset{
NetworkConditionerPresetNone,
NetworkConditionerPresetFullLoss,
NetworkConditionerPreset3G,
NetworkConditionerPresetDSL,
NetworkConditionerPresetEdge,
NetworkConditionerPresetLTE,
NetworkConditionerPresetVeryBadNetwork,
NetworkConditionerPresetWiFi,
}

for _, preset := range presets {
conditioner := NewNetworkConditioner(preset)

if conditioner == nil {
t.Fail()
}
}
}
Loading

0 comments on commit 2dfbb94

Please sign in to comment.