Skip to content

Commit

Permalink
Adjust connection windows size dynamically according to ACK received
Browse files Browse the repository at this point in the history
Signed-off-by: billfort <fxbao@hotmail.com>
  • Loading branch information
billfort committed Jan 27, 2023
1 parent 7ebb3f7 commit 8ea10a7
Show file tree
Hide file tree
Showing 13 changed files with 1,049 additions and 43 deletions.
4 changes: 0 additions & 4 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@ type Config struct {
NonStream bool
SessionWindowSize int32 // in bytes
MTU int32 // in bytes
InitialConnectionWindowSize int32 // in packets
MaxConnectionWindowSize int32 // in packets
MinConnectionWindowSize int32 // in packets
MaxAckSeqListSize int32
FlushInterval int32 // in millisecond
Expand All @@ -24,8 +22,6 @@ var DefaultConfig = Config{
NonStream: false,
SessionWindowSize: 4 << 20,
MTU: 1024,
InitialConnectionWindowSize: 16,
MaxConnectionWindowSize: 256,
MinConnectionWindowSize: 1,
MaxAckSeqListSize: 32,
FlushInterval: 10,
Expand Down
37 changes: 25 additions & 12 deletions connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ type Connection struct {
session *Session
localClientID string
remoteClientID string
windowSize uint32
windowSize float64
sendWindowUpdate chan struct{}

sync.RWMutex
Expand All @@ -26,12 +26,12 @@ type Connection struct {
retransmissionTimeout time.Duration
}

func NewConnection(session *Session, localClientID, remoteClientID string) (*Connection, error) {
func NewConnection(session *Session, localClientID, remoteClientID string, initialWindowSize float64) (*Connection, error) {
conn := &Connection{
session: session,
localClientID: localClientID,
remoteClientID: remoteClientID,
windowSize: uint32(session.config.InitialConnectionWindowSize),
windowSize: initialWindowSize,
retransmissionTimeout: time.Duration(session.config.InitialRetransmissionTimeout) * time.Millisecond,
sendWindowUpdate: make(chan struct{}, 1),
timeSentSeq: make(map[uint32]time.Time),
Expand Down Expand Up @@ -76,10 +76,7 @@ func (conn *Connection) ReceiveAck(sequenceID uint32, isSentByMe bool) {
}

if _, ok := conn.resentSeq[sequenceID]; !ok {
conn.windowSize++
if conn.windowSize > uint32(conn.session.config.MaxConnectionWindowSize) {
conn.windowSize = uint32(conn.session.config.MaxConnectionWindowSize)
}
conn.setWindowSize(conn.windowSize + 1)
}

if isSentByMe {
Expand All @@ -100,7 +97,7 @@ func (conn *Connection) ReceiveAck(sequenceID uint32, isSentByMe bool) {
}

func (conn *Connection) waitForSendWindow(ctx context.Context) error {
for conn.SendWindowUsed() >= conn.windowSize {
for float64(conn.SendWindowUsed()) >= conn.windowSize {
select {
case <-conn.sendWindowUpdate:
case <-time.After(maxWait):
Expand Down Expand Up @@ -163,6 +160,13 @@ func (conn *Connection) tx() error {
return err
}
log.Println(err)

// reduce window size
conn.Lock()
conn.setWindowSize(conn.windowSize / 2)
conn.Unlock()
conn.session.updateConnWindowSize()

select {
case conn.session.resendChan <- seq:
seq = 0
Expand Down Expand Up @@ -259,6 +263,7 @@ func (conn *Connection) checkTimeout() error {

threshold := time.Now().Add(-conn.retransmissionTimeout)
conn.Lock()
newResend := false
for seq, t := range conn.timeSentSeq {
if _, ok := conn.resentSeq[seq]; ok {
continue
Expand All @@ -267,15 +272,23 @@ func (conn *Connection) checkTimeout() error {
select {
case conn.session.resendChan <- seq:
conn.resentSeq[seq] = struct{}{}
conn.windowSize /= 2
if conn.windowSize < uint32(conn.session.config.MinConnectionWindowSize) {
conn.windowSize = uint32(conn.session.config.MinConnectionWindowSize)
}
conn.setWindowSize(conn.windowSize / 2)
newResend = true
case <-conn.session.context.Done():
return conn.session.context.Err()
}
}
}
conn.Unlock()
if newResend {
conn.session.updateConnWindowSize()
}
}
}

func (conn *Connection) setWindowSize(n float64) {
if n < float64(conn.session.config.MinConnectionWindowSize) {
n = float64(conn.session.config.MinConnectionWindowSize)
}
conn.windowSize = n
}
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go 1.13
require (
github.com/golang/protobuf v1.4.1
github.com/imdario/mergo v0.3.8
github.com/stretchr/testify v1.4.0 // indirect
github.com/nknorg/mockconn-go v0.0.0-20230125231524-d664e728352a
github.com/wcharczuk/go-chart/v2 v2.1.0
gopkg.in/yaml.v2 v2.2.8 // indirect
)
26 changes: 22 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/golang/freetype v0.0.0-20170609003504-e2365dfdc4a0 h1:DACJavvAHhabrF08vX0COfcOBJRhZ8lUbR+ZWIs0Y5g=
github.com/golang/freetype v0.0.0-20170609003504-e2365dfdc4a0/go.mod h1:E/TSTwGwJL78qG/PmXZO1EjYhfJinVAhrmmHX6Z8B9k=
github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8=
github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA=
github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs=
Expand All @@ -13,11 +16,24 @@ github.com/google/go-cmp v0.4.0 h1:xsAVV57WRhGj6kEIi8ReJzQlHHqcBYCElAvkovg3B/4=
github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/imdario/mergo v0.3.8 h1:CGgOkSJeqMRmt0D9XLWExdT4m4F1vd3FV3VPt+0VxkQ=
github.com/imdario/mergo v0.3.8/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJh5FfA=
github.com/nknorg/mockconn-go v0.0.0-20230125231524-d664e728352a h1:/SGWoUyHDfOgF1G/BQ9x9h1hVj3+mw150Aw4NlWLgtg=
github.com/nknorg/mockconn-go v0.0.0-20230125231524-d664e728352a/go.mod h1:/SvBORYxt9wlm8ZbaEFEri6ooOSDcU3ovU0L2eRRdS4=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/wcharczuk/go-chart/v2 v2.1.0 h1:tY2slqVQ6bN+yHSnDYwZebLQFkphK4WNrVwnt7CJZ2I=
github.com/wcharczuk/go-chart/v2 v2.1.0/go.mod h1:yx7MvAVNcP/kN9lKXM/NTce4au4DFN99j6i1OwDclNA=
golang.org/x/image v0.0.0-20200927104501-e162460cd6b5 h1:QelT11PB4FXiDEXucrfNckHoFxwt8USGY1ajP1ZF5lM=
golang.org/x/image v0.0.0-20200927104501-e162460cd6b5/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/time v0.3.0 h1:rg5rLMjNzMS1RkNLzCG38eapWhnYLFYXDXj2gOlr8j4=
golang.org/x/time v0.3.0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
Expand All @@ -29,6 +45,8 @@ google.golang.org/protobuf v1.22.0 h1:cJv5/xdbk1NnMPR1VP9+HU6gupuG9MLBoH1r6RHZ2M
google.golang.org/protobuf v1.22.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.8 h1:obN1ZagJSUGI0Ek/LBmuj4SNLPfIny3KsKFopxRdj10=
gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
70 changes: 48 additions & 22 deletions session.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ type Session struct {
bytesReadSentTime time.Time
bytesReadUpdateTime time.Time
remoteBytesRead uint64

sendWindowPacketCount float64 // Equal to sendWindowsSize / sendMtu
}

type SendWithFunc func(localClientID, remoteClientID string, buf []byte, writeTimeout time.Duration) error
Expand All @@ -72,26 +74,27 @@ func NewSession(localAddr, remoteAddr net.Addr, localClientIDs, remoteClientIDs
}

session := &Session{
config: config,
localAddr: localAddr,
remoteAddr: remoteAddr,
localClientIDs: localClientIDs,
remoteClientIDs: remoteClientIDs,
sendWith: sendWith,
sendWindowSize: uint32(config.SessionWindowSize),
recvWindowSize: uint32(config.SessionWindowSize),
sendMtu: uint32(config.MTU),
recvMtu: uint32(config.MTU),
sendWindowStartSeq: MinSequenceID,
sendWindowEndSeq: MinSequenceID,
recvWindowStartSeq: MinSequenceID,
recvWindowUsed: 0,
bytesWrite: 0,
bytesRead: 0,
bytesReadSentTime: time.Now(),
bytesReadUpdateTime: time.Now(),
remoteBytesRead: 0,
onAccept: make(chan struct{}, 1),
config: config,
localAddr: localAddr,
remoteAddr: remoteAddr,
localClientIDs: localClientIDs,
remoteClientIDs: remoteClientIDs,
sendWith: sendWith,
sendWindowSize: uint32(config.SessionWindowSize),
recvWindowSize: uint32(config.SessionWindowSize),
sendMtu: uint32(config.MTU),
recvMtu: uint32(config.MTU),
sendWindowStartSeq: MinSequenceID,
sendWindowEndSeq: MinSequenceID,
recvWindowStartSeq: MinSequenceID,
recvWindowUsed: 0,
bytesWrite: 0,
bytesRead: 0,
bytesReadSentTime: time.Now(),
bytesReadUpdateTime: time.Now(),
remoteBytesRead: 0,
onAccept: make(chan struct{}, 1),
sendWindowPacketCount: float64(config.SessionWindowSize) / float64(config.MTU),
}

session.context, session.cancel = context.WithCancel(context.Background())
Expand Down Expand Up @@ -256,6 +259,8 @@ func (session *Session) ReceiveWith(localClientID, remoteClientID string, buf []
}
}
}

session.updateConnWindowSize()
}

if isEstablished && packet.BytesRead > session.remoteBytesRead {
Expand Down Expand Up @@ -519,6 +524,7 @@ func (session *Session) handleHandshakePacket(packet *pb.Packet) error {
if packet.Mtu < session.sendMtu {
session.sendMtu = packet.Mtu
}
session.sendWindowPacketCount = float64(session.sendWindowSize) / float64(session.sendMtu)

if len(packet.ClientIds) == 0 {
return ErrInvalidPacket
Expand All @@ -528,9 +534,10 @@ func (session *Session) handleHandshakePacket(packet *pb.Packet) error {
n = len(packet.ClientIds)
}

initialWindowSize := session.sendWindowPacketCount / float64(n)
connections := make(map[string]*Connection, n)
for i := 0; i < n; i++ {
conn, err := NewConnection(session, session.localClientIDs[i], packet.ClientIds[i])
conn, err := NewConnection(session, session.localClientIDs[i], packet.ClientIds[i], initialWindowSize)
if err != nil {
return err
}
Expand All @@ -540,7 +547,7 @@ func (session *Session) handleHandshakePacket(packet *pb.Packet) error {

session.remoteClientIDs = packet.ClientIds
session.sendChan = make(chan uint32)
session.resendChan = make(chan uint32, session.config.MaxConnectionWindowSize*int32(n))
session.resendChan = make(chan uint32, int(session.sendWindowPacketCount)+n)
session.sendWindowUpdate = make(chan struct{}, 1)
session.recvDataUpdate = make(chan struct{}, 1)
session.sendBuffer = make([]byte, 0, session.sendMtu)
Expand Down Expand Up @@ -943,3 +950,22 @@ func (session *Session) SetWriteDeadline(t time.Time) error {
func (session *Session) SetLinger(t int32) {
session.config.Linger = t
}

func (session *Session) updateConnWindowSize() {
totalSize := 0.0
for _, conn := range session.connections {
conn.RLock()
totalSize += float64(conn.windowSize)
conn.RUnlock()
}
if totalSize <= 0 {
return
}

for _, conn := range session.connections {
conn.Lock()
n := session.sendWindowPacketCount * (conn.windowSize / totalSize)
conn.setWindowSize(n)
conn.Unlock()
}
}
Loading

0 comments on commit 8ea10a7

Please sign in to comment.