Skip to content

Commit

Permalink
Merge pull request #2615 from lucas-clemente/bucket-pacer
Browse files Browse the repository at this point in the history
implement a token-bucket pacing algorithm
  • Loading branch information
marten-seemann authored Jun 24, 2020
2 parents 1607ace + fda00fe commit b0b996e
Show file tree
Hide file tree
Showing 11 changed files with 284 additions and 148 deletions.
8 changes: 2 additions & 6 deletions internal/ackhandler/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,8 @@ type SentPacketHandler interface {
// TimeUntilSend is the time when the next packet should be sent.
// It is used for pacing packets.
TimeUntilSend() time.Time
// ShouldSendNumPackets returns the number of packets that should be sent immediately.
// It always returns a number greater or equal than 1.
// A number greater than 1 is returned when the pacing delay is smaller than the minimum pacing delay.
// Note that the number of packets is only calculated based on the pacing algorithm.
// Before sending any packet, SendingAllowed() must be called to learn if we can actually send it.
ShouldSendNumPackets() int
// HasPacingBudget says if the pacer allows sending of a (full size) packet at this moment.
HasPacingBudget() bool

// only to be called once the handshake is complete
QueueProbePacket(protocol.EncryptionLevel) bool /* was a packet queued */
Expand Down
18 changes: 3 additions & 15 deletions internal/ackhandler/sent_packet_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package ackhandler
import (
"errors"
"fmt"
"math"
"time"

"github.com/lucas-clemente/quic-go/internal/congestion"
Expand Down Expand Up @@ -46,8 +45,6 @@ func newPacketNumberSpace(initialPN protocol.PacketNumber) *packetNumberSpace {
}

type sentPacketHandler struct {
nextSendTime time.Time

initialPackets *packetNumberSpace
handshakePackets *packetNumberSpace
appDataPackets *packetNumberSpace
Expand Down Expand Up @@ -254,7 +251,6 @@ func (h *sentPacketHandler) sentPacketImpl(packet *Packet) bool /* is ack-elicit
}
h.congestion.OnPacketSent(packet.SendTime, h.bytesInFlight, packet.PacketNumber, packet.Length, isAckEliciting)

h.nextSendTime = utils.MaxTime(h.nextSendTime, packet.SendTime).Add(h.congestion.TimeUntilSend(h.bytesInFlight))
return isAckEliciting
}

Expand Down Expand Up @@ -720,19 +716,11 @@ func (h *sentPacketHandler) SendMode() SendMode {
}

func (h *sentPacketHandler) TimeUntilSend() time.Time {
return h.nextSendTime
return h.congestion.TimeUntilSend(h.bytesInFlight)
}

func (h *sentPacketHandler) ShouldSendNumPackets() int {
if h.numProbesToSend > 0 {
// RTO probes should not be paced, but must be sent immediately.
return h.numProbesToSend
}
delay := h.congestion.TimeUntilSend(h.bytesInFlight)
if delay == 0 || delay > protocol.MinPacingDelay {
return 1
}
return int(math.Ceil(float64(protocol.MinPacingDelay) / float64(delay)))
func (h *sentPacketHandler) HasPacingBudget() bool {
return h.congestion.HasPacingBudget()
}

func (h *sentPacketHandler) AmplificationWindow() protocol.ByteCount {
Expand Down
47 changes: 10 additions & 37 deletions internal/ackhandler/sent_packet_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/lucas-clemente/quic-go/internal/protocol"
"github.com/lucas-clemente/quic-go/internal/utils"
"github.com/lucas-clemente/quic-go/internal/wire"

. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
)
Expand Down Expand Up @@ -427,7 +428,6 @@ var _ = Describe("SentPacketHandler", func() {
protocol.ByteCount(42),
true,
)
cong.EXPECT().TimeUntilSend(gomock.Any())
handler.SentPacket(&Packet{
PacketNumber: 1,
Length: 42,
Expand All @@ -439,7 +439,6 @@ var _ = Describe("SentPacketHandler", func() {
It("should call MaybeExitSlowStart and OnPacketAcked", func() {
rcvTime := time.Now().Add(-5 * time.Second)
cong.EXPECT().OnPacketSent(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Times(3)
cong.EXPECT().TimeUntilSend(gomock.Any()).Times(3)
gomock.InOrder(
cong.EXPECT().MaybeExitSlowStart(), // must be called before packets are acked
cong.EXPECT().OnPacketAcked(protocol.PacketNumber(1), protocol.ByteCount(1), protocol.ByteCount(3), rcvTime),
Expand All @@ -454,7 +453,6 @@ var _ = Describe("SentPacketHandler", func() {

It("doesn't call OnPacketAcked when a retransmitted packet is acked", func() {
cong.EXPECT().OnPacketSent(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Times(2)
cong.EXPECT().TimeUntilSend(gomock.Any()).Times(2)
handler.SentPacket(ackElicitingPacket(&Packet{PacketNumber: 1, SendTime: time.Now().Add(-time.Hour)}))
handler.SentPacket(ackElicitingPacket(&Packet{PacketNumber: 2}))
// lose packet 1
Expand All @@ -472,7 +470,6 @@ var _ = Describe("SentPacketHandler", func() {

It("calls OnPacketAcked and OnPacketLost with the right bytes_in_flight value", func() {
cong.EXPECT().OnPacketSent(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Times(4)
cong.EXPECT().TimeUntilSend(gomock.Any()).Times(4)
handler.SentPacket(ackElicitingPacket(&Packet{PacketNumber: 1, SendTime: time.Now().Add(-time.Hour)}))
handler.SentPacket(ackElicitingPacket(&Packet{PacketNumber: 2, SendTime: time.Now().Add(-30 * time.Minute)}))
handler.SentPacket(ackElicitingPacket(&Packet{PacketNumber: 3, SendTime: time.Now().Add(-30 * time.Minute)}))
Expand All @@ -498,7 +495,6 @@ var _ = Describe("SentPacketHandler", func() {
It("passes the bytes in flight to the congestion controller", func() {
handler.ReceivedPacket(protocol.EncryptionHandshake)
cong.EXPECT().OnPacketSent(gomock.Any(), protocol.ByteCount(42), gomock.Any(), protocol.ByteCount(42), true)
cong.EXPECT().TimeUntilSend(gomock.Any())
handler.SentPacket(&Packet{
Length: 42,
EncryptionLevel: protocol.EncryptionInitial,
Expand All @@ -512,7 +508,6 @@ var _ = Describe("SentPacketHandler", func() {
It("returns SendNone if limited by the 3x limit", func() {
handler.ReceivedBytes(100)
cong.EXPECT().OnPacketSent(gomock.Any(), protocol.ByteCount(300), gomock.Any(), protocol.ByteCount(300), true)
cong.EXPECT().TimeUntilSend(gomock.Any())
handler.SentPacket(&Packet{
Length: 300,
EncryptionLevel: protocol.EncryptionInitial,
Expand All @@ -527,7 +522,6 @@ var _ = Describe("SentPacketHandler", func() {
It("limits the window to 3x the bytes received, to avoid amplification attacks", func() {
handler.ReceivedPacket(protocol.EncryptionInitial) // receiving an Initial packet doesn't validate the client's address
cong.EXPECT().OnPacketSent(gomock.Any(), protocol.ByteCount(50), gomock.Any(), protocol.ByteCount(50), true)
cong.EXPECT().TimeUntilSend(gomock.Any())
handler.SentPacket(&Packet{
Length: 50,
EncryptionLevel: protocol.EncryptionInitial,
Expand All @@ -549,7 +543,6 @@ var _ = Describe("SentPacketHandler", func() {
It("allows sending of ACKs when we're keeping track of MaxOutstandingSentPackets packets", func() {
handler.ReceivedPacket(protocol.EncryptionHandshake)
cong.EXPECT().CanSend(gomock.Any()).Return(true).AnyTimes()
cong.EXPECT().TimeUntilSend(gomock.Any()).AnyTimes()
cong.EXPECT().OnPacketSent(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes()
for i := protocol.PacketNumber(1); i < protocol.MaxOutstandingSentPackets; i++ {
handler.SentPacket(ackElicitingPacket(&Packet{PacketNumber: i}))
Expand All @@ -568,35 +561,17 @@ var _ = Describe("SentPacketHandler", func() {
Expect(handler.SendMode()).To(Equal(SendPTOHandshake))
})

It("gets the pacing delay", func() {
sendTime := time.Now().Add(-time.Minute)
handler.bytesInFlight = 100
cong.EXPECT().OnPacketSent(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any())
cong.EXPECT().TimeUntilSend(protocol.ByteCount(100)).Return(time.Hour)
handler.SentPacket(&Packet{PacketNumber: 1, SendTime: sendTime, EncryptionLevel: protocol.Encryption1RTT})
Expect(handler.TimeUntilSend()).To(Equal(sendTime.Add(time.Hour)))
})

It("allows sending of all RTO probe packets", func() {
handler.numProbesToSend = 5
Expect(handler.ShouldSendNumPackets()).To(Equal(5))
})

It("allows sending of one packet, if it should be sent immediately", func() {
cong.EXPECT().TimeUntilSend(gomock.Any()).Return(time.Duration(0))
Expect(handler.ShouldSendNumPackets()).To(Equal(1))
})

It("allows sending of multiple packets, if the pacing delay is smaller than the minimum", func() {
pacingDelay := protocol.MinPacingDelay / 10
cong.EXPECT().TimeUntilSend(gomock.Any()).Return(pacingDelay)
Expect(handler.ShouldSendNumPackets()).To(Equal(10))
It("says if it has pacing budget", func() {
cong.EXPECT().HasPacingBudget().Return(true)
Expect(handler.HasPacingBudget()).To(BeTrue())
cong.EXPECT().HasPacingBudget().Return(false)
Expect(handler.HasPacingBudget()).To(BeFalse())
})

It("allows sending of multiple packets, if the pacing delay is smaller than the minimum, and not a fraction", func() {
pacingDelay := protocol.MinPacingDelay * 2 / 5
cong.EXPECT().TimeUntilSend(gomock.Any()).Return(pacingDelay)
Expect(handler.ShouldSendNumPackets()).To(Equal(3))
It("returns the pacing delay", func() {
t := time.Now()
cong.EXPECT().TimeUntilSend(gomock.Any()).Return(t)
Expect(handler.TimeUntilSend()).To(Equal(t))
})
})

Expand Down Expand Up @@ -694,7 +669,6 @@ var _ = Describe("SentPacketHandler", func() {
}))
Expect(handler.OnLossDetectionTimeout()).To(Succeed())
Expect(handler.SendMode()).To(Equal(SendPTOAppData))
Expect(handler.ShouldSendNumPackets()).To(Equal(2))
handler.SentPacket(ackElicitingPacket(&Packet{PacketNumber: 2}))
Expect(handler.SendMode()).To(Equal(SendPTOAppData))
handler.SentPacket(ackElicitingPacket(&Packet{PacketNumber: 3}))
Expand All @@ -707,7 +681,6 @@ var _ = Describe("SentPacketHandler", func() {
handler.SentPacket(ackElicitingPacket(&Packet{PacketNumber: 1, SendTime: time.Now().Add(-time.Hour)}))
Expect(handler.OnLossDetectionTimeout()).To(Succeed())
Expect(handler.SendMode()).To(Equal(SendPTOAppData))
Expect(handler.ShouldSendNumPackets()).To(Equal(2))
handler.SentPacket(ackElicitingPacket(&Packet{PacketNumber: 2}))
Expect(handler.SendMode()).To(Equal(SendPTOAppData))
for p := protocol.PacketNumber(3); p < 30; p++ {
Expand Down
16 changes: 13 additions & 3 deletions internal/congestion/cubic_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ type cubicSender struct {
rttStats *RTTStats
stats connectionStats
cubic *Cubic
pacer *pacer
clock Clock

reno bool

Expand Down Expand Up @@ -75,7 +77,7 @@ func NewCubicSender(clock Clock, rttStats *RTTStats, reno bool) *cubicSender {
}

func newCubicSender(clock Clock, rttStats *RTTStats, reno bool, initialCongestionWindow, initialMaxCongestionWindow protocol.ByteCount) *cubicSender {
return &cubicSender{
c := &cubicSender{
rttStats: rttStats,
largestSentPacketNumber: protocol.InvalidPacketNumber,
largestAckedPacketNumber: protocol.InvalidPacketNumber,
Expand All @@ -88,13 +90,20 @@ func newCubicSender(clock Clock, rttStats *RTTStats, reno bool, initialCongestio
maxCongestionWindow: initialMaxCongestionWindow,
numConnections: defaultNumConnections,
cubic: NewCubic(clock),
clock: clock,
reno: reno,
}
c.pacer = newPacer(c.BandwidthEstimate)
return c
}

// TimeUntilSend returns when the next packet should be sent.
func (c *cubicSender) TimeUntilSend(bytesInFlight protocol.ByteCount) time.Duration {
return c.rttStats.SmoothedRTT() * time.Duration(maxDatagramSize) / time.Duration(2*c.GetCongestionWindow())
func (c *cubicSender) TimeUntilSend(_ protocol.ByteCount) time.Time {
return c.pacer.TimeUntilSend()
}

func (c *cubicSender) HasPacingBudget() bool {
return c.pacer.Budget(c.clock.Now()) >= maxDatagramSize
}

func (c *cubicSender) OnPacketSent(
Expand All @@ -104,6 +113,7 @@ func (c *cubicSender) OnPacketSent(
bytes protocol.ByteCount,
isRetransmittable bool,
) {
c.pacer.SentPacket(sentTime, bytes)
if !isRetransmittable {
return
}
Expand Down
3 changes: 2 additions & 1 deletion internal/congestion/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ import (

// A SendAlgorithm performs congestion control
type SendAlgorithm interface {
TimeUntilSend(bytesInFlight protocol.ByteCount) time.Duration
TimeUntilSend(bytesInFlight protocol.ByteCount) time.Time
HasPacingBudget() bool
OnPacketSent(sentTime time.Time, bytesInFlight protocol.ByteCount, packetNumber protocol.PacketNumber, bytes protocol.ByteCount, isRetransmittable bool)
CanSend(bytesInFlight protocol.ByteCount) bool
MaybeExitSlowStart()
Expand Down
68 changes: 68 additions & 0 deletions internal/congestion/pacer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package congestion

import (
"math"
"time"

"github.com/lucas-clemente/quic-go/internal/protocol"
"github.com/lucas-clemente/quic-go/internal/utils"
)

const maxBurstSize = 10 * maxDatagramSize

// The pacer implements a token bucket pacing algorithm.
type pacer struct {
budgetAtLastSent protocol.ByteCount
lastSentTime time.Time
getAdjustedBandwidth func() uint64 // in bytes/s
}

func newPacer(getBandwidth func() Bandwidth) *pacer {
p := &pacer{getAdjustedBandwidth: func() uint64 {
// Bandwidth is in bits/s. We need the value in bytes/s.
bw := uint64(getBandwidth() / BytesPerSecond)
// Use a slightly higher value than the actual measured bandwidth.
// RTT variations then won't result in under-utilization of the congestion window.
// Ultimately, this will result in sending packets as acknowledgments are received rather than when timers fire,
// provided the congestion window is fully utilized and acknowledgments arrive at regular intervals.
return bw * 5 / 4
}}
p.budgetAtLastSent = p.maxBurstSize()
return p
}

func (p *pacer) SentPacket(sendTime time.Time, size protocol.ByteCount) {
budget := p.Budget(sendTime)
if size > budget {
p.budgetAtLastSent = 0
} else {
p.budgetAtLastSent = budget - size
}
p.lastSentTime = sendTime
}

func (p *pacer) Budget(now time.Time) protocol.ByteCount {
if p.lastSentTime.IsZero() {
return p.maxBurstSize()
}
budget := p.budgetAtLastSent + (protocol.ByteCount(p.getAdjustedBandwidth())*protocol.ByteCount(now.Sub(p.lastSentTime).Nanoseconds()))/1e9
return utils.MinByteCount(p.maxBurstSize(), budget)
}

func (p *pacer) maxBurstSize() protocol.ByteCount {
return utils.MaxByteCount(
protocol.ByteCount(uint64((protocol.MinPacingDelay+protocol.TimerGranularity).Nanoseconds())*p.getAdjustedBandwidth())/1e9,
maxBurstSize,
)
}

// TimeUntilSend returns when the next packet should be sent.
func (p *pacer) TimeUntilSend() time.Time {
if p.budgetAtLastSent >= maxDatagramSize {
return time.Time{}
}
return p.lastSentTime.Add(utils.MaxDuration(
protocol.MinPacingDelay,
time.Duration(math.Ceil(float64(maxDatagramSize-p.budgetAtLastSent)*1e9/float64(p.getAdjustedBandwidth())))*time.Nanosecond,
))
}
Loading

0 comments on commit b0b996e

Please sign in to comment.