Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adjust connection windows size dynamically according to ACK received #4

Merged
merged 1 commit into from
Jan 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@ type Config struct {
NonStream bool
SessionWindowSize int32 // in bytes
MTU int32 // in bytes
InitialConnectionWindowSize int32 // in packets
MaxConnectionWindowSize int32 // in packets
MinConnectionWindowSize int32 // in packets
MaxAckSeqListSize int32
FlushInterval int32 // in millisecond
Expand All @@ -24,8 +22,6 @@ var DefaultConfig = Config{
NonStream: false,
SessionWindowSize: 4 << 20,
MTU: 1024,
InitialConnectionWindowSize: 16,
MaxConnectionWindowSize: 256,
MinConnectionWindowSize: 1,
MaxAckSeqListSize: 32,
FlushInterval: 10,
Expand Down
37 changes: 25 additions & 12 deletions connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ type Connection struct {
session *Session
localClientID string
remoteClientID string
windowSize uint32
windowSize float64
sendWindowUpdate chan struct{}

sync.RWMutex
Expand All @@ -26,12 +26,12 @@ type Connection struct {
retransmissionTimeout time.Duration
}

func NewConnection(session *Session, localClientID, remoteClientID string) (*Connection, error) {
func NewConnection(session *Session, localClientID, remoteClientID string, initialWindowSize float64) (*Connection, error) {
conn := &Connection{
session: session,
localClientID: localClientID,
remoteClientID: remoteClientID,
windowSize: uint32(session.config.InitialConnectionWindowSize),
yilunzhang marked this conversation as resolved.
Show resolved Hide resolved
windowSize: initialWindowSize,
retransmissionTimeout: time.Duration(session.config.InitialRetransmissionTimeout) * time.Millisecond,
sendWindowUpdate: make(chan struct{}, 1),
timeSentSeq: make(map[uint32]time.Time),
Expand Down Expand Up @@ -76,10 +76,7 @@ func (conn *Connection) ReceiveAck(sequenceID uint32, isSentByMe bool) {
}

if _, ok := conn.resentSeq[sequenceID]; !ok {
conn.windowSize++
if conn.windowSize > uint32(conn.session.config.MaxConnectionWindowSize) {
conn.windowSize = uint32(conn.session.config.MaxConnectionWindowSize)
}
conn.setWindowSize(conn.windowSize + 1)
}

if isSentByMe {
Expand All @@ -100,7 +97,7 @@ func (conn *Connection) ReceiveAck(sequenceID uint32, isSentByMe bool) {
}

func (conn *Connection) waitForSendWindow(ctx context.Context) error {
for conn.SendWindowUsed() >= conn.windowSize {
for float64(conn.SendWindowUsed()) >= conn.windowSize {
select {
case <-conn.sendWindowUpdate:
case <-time.After(maxWait):
Expand Down Expand Up @@ -163,6 +160,13 @@ func (conn *Connection) tx() error {
return err
}
log.Println(err)

// reduce window size
conn.Lock()
conn.setWindowSize(conn.windowSize / 2)
conn.Unlock()
conn.session.updateConnWindowSize()

select {
case conn.session.resendChan <- seq:
seq = 0
Expand Down Expand Up @@ -259,6 +263,7 @@ func (conn *Connection) checkTimeout() error {

threshold := time.Now().Add(-conn.retransmissionTimeout)
conn.Lock()
newResend := false
for seq, t := range conn.timeSentSeq {
if _, ok := conn.resentSeq[seq]; ok {
continue
Expand All @@ -267,15 +272,23 @@ func (conn *Connection) checkTimeout() error {
select {
case conn.session.resendChan <- seq:
conn.resentSeq[seq] = struct{}{}
conn.windowSize /= 2
if conn.windowSize < uint32(conn.session.config.MinConnectionWindowSize) {
conn.windowSize = uint32(conn.session.config.MinConnectionWindowSize)
}
conn.setWindowSize(conn.windowSize / 2)
newResend = true
case <-conn.session.context.Done():
return conn.session.context.Err()
}
}
}
conn.Unlock()
if newResend {
conn.session.updateConnWindowSize()
}
}
}

func (conn *Connection) setWindowSize(n float64) {
if n < float64(conn.session.config.MinConnectionWindowSize) {
n = float64(conn.session.config.MinConnectionWindowSize)
}
conn.windowSize = n
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,6 @@ go 1.13
require (
github.com/golang/protobuf v1.4.1
github.com/imdario/mergo v0.3.8
github.com/stretchr/testify v1.4.0 // indirect
github.com/nknorg/mockconn-go v0.0.0-20230125231524-d664e728352a
gopkg.in/yaml.v2 v2.2.8 // indirect
)
19 changes: 15 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8=
github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA=
github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs=
Expand All @@ -13,11 +14,19 @@ github.com/google/go-cmp v0.4.0 h1:xsAVV57WRhGj6kEIi8ReJzQlHHqcBYCElAvkovg3B/4=
github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/imdario/mergo v0.3.8 h1:CGgOkSJeqMRmt0D9XLWExdT4m4F1vd3FV3VPt+0VxkQ=
github.com/imdario/mergo v0.3.8/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJh5FfA=
github.com/nknorg/mockconn-go v0.0.0-20230125231524-d664e728352a h1:/SGWoUyHDfOgF1G/BQ9x9h1hVj3+mw150Aw4NlWLgtg=
github.com/nknorg/mockconn-go v0.0.0-20230125231524-d664e728352a/go.mod h1:/SvBORYxt9wlm8ZbaEFEri6ooOSDcU3ovU0L2eRRdS4=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
golang.org/x/time v0.3.0 h1:rg5rLMjNzMS1RkNLzCG38eapWhnYLFYXDXj2gOlr8j4=
golang.org/x/time v0.3.0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
Expand All @@ -29,6 +38,8 @@ google.golang.org/protobuf v1.22.0 h1:cJv5/xdbk1NnMPR1VP9+HU6gupuG9MLBoH1r6RHZ2M
google.golang.org/protobuf v1.22.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.8 h1:obN1ZagJSUGI0Ek/LBmuj4SNLPfIny3KsKFopxRdj10=
gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
70 changes: 48 additions & 22 deletions session.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ type Session struct {
bytesReadSentTime time.Time
bytesReadUpdateTime time.Time
remoteBytesRead uint64

sendWindowPacketCount float64 // Equal to sendWindowsSize / sendMtu
}

type SendWithFunc func(localClientID, remoteClientID string, buf []byte, writeTimeout time.Duration) error
Expand All @@ -72,26 +74,27 @@ func NewSession(localAddr, remoteAddr net.Addr, localClientIDs, remoteClientIDs
}

session := &Session{
config: config,
localAddr: localAddr,
remoteAddr: remoteAddr,
localClientIDs: localClientIDs,
remoteClientIDs: remoteClientIDs,
sendWith: sendWith,
sendWindowSize: uint32(config.SessionWindowSize),
recvWindowSize: uint32(config.SessionWindowSize),
sendMtu: uint32(config.MTU),
recvMtu: uint32(config.MTU),
sendWindowStartSeq: MinSequenceID,
sendWindowEndSeq: MinSequenceID,
recvWindowStartSeq: MinSequenceID,
recvWindowUsed: 0,
bytesWrite: 0,
bytesRead: 0,
bytesReadSentTime: time.Now(),
bytesReadUpdateTime: time.Now(),
remoteBytesRead: 0,
onAccept: make(chan struct{}, 1),
config: config,
localAddr: localAddr,
remoteAddr: remoteAddr,
localClientIDs: localClientIDs,
remoteClientIDs: remoteClientIDs,
sendWith: sendWith,
sendWindowSize: uint32(config.SessionWindowSize),
recvWindowSize: uint32(config.SessionWindowSize),
sendMtu: uint32(config.MTU),
recvMtu: uint32(config.MTU),
sendWindowStartSeq: MinSequenceID,
sendWindowEndSeq: MinSequenceID,
recvWindowStartSeq: MinSequenceID,
recvWindowUsed: 0,
bytesWrite: 0,
bytesRead: 0,
bytesReadSentTime: time.Now(),
bytesReadUpdateTime: time.Now(),
remoteBytesRead: 0,
onAccept: make(chan struct{}, 1),
sendWindowPacketCount: float64(config.SessionWindowSize) / float64(config.MTU),
}

session.context, session.cancel = context.WithCancel(context.Background())
Expand Down Expand Up @@ -256,6 +259,8 @@ func (session *Session) ReceiveWith(localClientID, remoteClientID string, buf []
}
}
}

session.updateConnWindowSize()
}

if isEstablished && packet.BytesRead > session.remoteBytesRead {
Expand Down Expand Up @@ -519,6 +524,7 @@ func (session *Session) handleHandshakePacket(packet *pb.Packet) error {
if packet.Mtu < session.sendMtu {
session.sendMtu = packet.Mtu
}
session.sendWindowPacketCount = float64(session.sendWindowSize) / float64(session.sendMtu)

if len(packet.ClientIds) == 0 {
return ErrInvalidPacket
Expand All @@ -528,9 +534,10 @@ func (session *Session) handleHandshakePacket(packet *pb.Packet) error {
n = len(packet.ClientIds)
}

initialWindowSize := session.sendWindowPacketCount / float64(n)
connections := make(map[string]*Connection, n)
for i := 0; i < n; i++ {
conn, err := NewConnection(session, session.localClientIDs[i], packet.ClientIds[i])
conn, err := NewConnection(session, session.localClientIDs[i], packet.ClientIds[i], initialWindowSize)
if err != nil {
return err
}
Expand All @@ -540,7 +547,7 @@ func (session *Session) handleHandshakePacket(packet *pb.Packet) error {

session.remoteClientIDs = packet.ClientIds
session.sendChan = make(chan uint32)
session.resendChan = make(chan uint32, session.config.MaxConnectionWindowSize*int32(n))
session.resendChan = make(chan uint32, int(session.sendWindowPacketCount)+n)
session.sendWindowUpdate = make(chan struct{}, 1)
session.recvDataUpdate = make(chan struct{}, 1)
session.sendBuffer = make([]byte, 0, session.sendMtu)
Expand Down Expand Up @@ -943,3 +950,22 @@ func (session *Session) SetWriteDeadline(t time.Time) error {
func (session *Session) SetLinger(t int32) {
session.config.Linger = t
}

func (session *Session) updateConnWindowSize() {
totalSize := 0.0
for _, conn := range session.connections {
conn.RLock()
totalSize += float64(conn.windowSize)
yilunzhang marked this conversation as resolved.
Show resolved Hide resolved
conn.RUnlock()
}
if totalSize <= 0 {
return
}

for _, conn := range session.connections {
conn.Lock()
n := session.sendWindowPacketCount * (conn.windowSize / totalSize)
conn.setWindowSize(n)
yilunzhang marked this conversation as resolved.
Show resolved Hide resolved
conn.Unlock()
}
}
Loading