From d46ac8c3c38b7c186c635395af2632a031f125a1 Mon Sep 17 00:00:00 2001 From: billfort Date: Tue, 17 Jan 2023 11:30:57 -0700 Subject: [PATCH] Adjust connection windows size dynamically according to ACK received Signed-off-by: billfort --- config.go | 4 - connection.go | 37 ++++-- go.mod | 2 +- go.sum | 19 ++- session.go | 70 +++++++--- test/exception_test.go | 267 +++++++++++++++++++++++++++++++++++++ test/latency_test.go | 34 +++++ test/loss_test.go | 34 +++++ test/pub_test.go | 75 +++++++++++ test/testsession.go | 288 ++++++++++++++++++++++++++++++++++++++++ test/throughput_test.go | 170 ++++++++++++++++++++++++ 11 files changed, 957 insertions(+), 43 deletions(-) create mode 100644 test/exception_test.go create mode 100644 test/latency_test.go create mode 100644 test/loss_test.go create mode 100644 test/pub_test.go create mode 100644 test/testsession.go create mode 100644 test/throughput_test.go diff --git a/config.go b/config.go index 05280e8..b70a1cb 100644 --- a/config.go +++ b/config.go @@ -6,8 +6,6 @@ type Config struct { NonStream bool SessionWindowSize int32 // in bytes MTU int32 // in bytes - InitialConnectionWindowSize int32 // in packets - MaxConnectionWindowSize int32 // in packets MinConnectionWindowSize int32 // in packets MaxAckSeqListSize int32 FlushInterval int32 // in millisecond @@ -24,8 +22,6 @@ var DefaultConfig = Config{ NonStream: false, SessionWindowSize: 4 << 20, MTU: 1024, - InitialConnectionWindowSize: 16, - MaxConnectionWindowSize: 256, MinConnectionWindowSize: 1, MaxAckSeqListSize: 32, FlushInterval: 10, diff --git a/connection.go b/connection.go index 040732c..1070fe9 100644 --- a/connection.go +++ b/connection.go @@ -16,7 +16,7 @@ type Connection struct { session *Session localClientID string remoteClientID string - windowSize uint32 + windowSize float64 sendWindowUpdate chan struct{} sync.RWMutex @@ -26,12 +26,12 @@ type Connection struct { retransmissionTimeout time.Duration } -func NewConnection(session *Session, localClientID, remoteClientID string) (*Connection, error) { +func NewConnection(session *Session, localClientID, remoteClientID string, initialWindowSize float64) (*Connection, error) { conn := &Connection{ session: session, localClientID: localClientID, remoteClientID: remoteClientID, - windowSize: uint32(session.config.InitialConnectionWindowSize), + windowSize: initialWindowSize, retransmissionTimeout: time.Duration(session.config.InitialRetransmissionTimeout) * time.Millisecond, sendWindowUpdate: make(chan struct{}, 1), timeSentSeq: make(map[uint32]time.Time), @@ -76,10 +76,7 @@ func (conn *Connection) ReceiveAck(sequenceID uint32, isSentByMe bool) { } if _, ok := conn.resentSeq[sequenceID]; !ok { - conn.windowSize++ - if conn.windowSize > uint32(conn.session.config.MaxConnectionWindowSize) { - conn.windowSize = uint32(conn.session.config.MaxConnectionWindowSize) - } + conn.setWindowSize(conn.windowSize + 1) } if isSentByMe { @@ -100,7 +97,7 @@ func (conn *Connection) ReceiveAck(sequenceID uint32, isSentByMe bool) { } func (conn *Connection) waitForSendWindow(ctx context.Context) error { - for conn.SendWindowUsed() >= conn.windowSize { + for float64(conn.SendWindowUsed()) >= conn.windowSize { select { case <-conn.sendWindowUpdate: case <-time.After(maxWait): @@ -163,6 +160,13 @@ func (conn *Connection) tx() error { return err } log.Println(err) + + // reduce window size + conn.Lock() + conn.setWindowSize(conn.windowSize / 2) + conn.Unlock() + conn.session.updateConnWindowSize() + select { case conn.session.resendChan <- seq: seq = 0 @@ -259,6 +263,7 @@ func (conn *Connection) checkTimeout() error { threshold := time.Now().Add(-conn.retransmissionTimeout) conn.Lock() + newResend := false for seq, t := range conn.timeSentSeq { if _, ok := conn.resentSeq[seq]; ok { continue @@ -267,15 +272,23 @@ func (conn *Connection) checkTimeout() error { select { case conn.session.resendChan <- seq: conn.resentSeq[seq] = struct{}{} - conn.windowSize /= 2 - if conn.windowSize < uint32(conn.session.config.MinConnectionWindowSize) { - conn.windowSize = uint32(conn.session.config.MinConnectionWindowSize) - } + conn.setWindowSize(conn.windowSize / 2) + newResend = true case <-conn.session.context.Done(): return conn.session.context.Err() } } } conn.Unlock() + if newResend { + conn.session.updateConnWindowSize() + } + } +} + +func (conn *Connection) setWindowSize(n float64) { + if n < float64(conn.session.config.MinConnectionWindowSize) { + n = float64(conn.session.config.MinConnectionWindowSize) } + conn.windowSize = n } diff --git a/go.mod b/go.mod index b46224c..ce38a84 100644 --- a/go.mod +++ b/go.mod @@ -5,6 +5,6 @@ go 1.13 require ( github.com/golang/protobuf v1.4.1 github.com/imdario/mergo v0.3.8 - github.com/stretchr/testify v1.4.0 // indirect + github.com/nknorg/mockconn-go v0.0.0-20230125231524-d664e728352a gopkg.in/yaml.v2 v2.2.8 // indirect ) diff --git a/go.sum b/go.sum index 1d4ca43..4969901 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,6 @@ -github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8= github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA= github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs= @@ -13,11 +14,19 @@ github.com/google/go-cmp v0.4.0 h1:xsAVV57WRhGj6kEIi8ReJzQlHHqcBYCElAvkovg3B/4= github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/imdario/mergo v0.3.8 h1:CGgOkSJeqMRmt0D9XLWExdT4m4F1vd3FV3VPt+0VxkQ= github.com/imdario/mergo v0.3.8/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJh5FfA= +github.com/nknorg/mockconn-go v0.0.0-20230125231524-d664e728352a h1:/SGWoUyHDfOgF1G/BQ9x9h1hVj3+mw150Aw4NlWLgtg= +github.com/nknorg/mockconn-go v0.0.0-20230125231524-d664e728352a/go.mod h1:/SvBORYxt9wlm8ZbaEFEri6ooOSDcU3ovU0L2eRRdS4= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= -github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk= -github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= +github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= +github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= +github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= +github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= +golang.org/x/time v0.3.0 h1:rg5rLMjNzMS1RkNLzCG38eapWhnYLFYXDXj2gOlr8j4= +golang.org/x/time v0.3.0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= @@ -29,6 +38,8 @@ google.golang.org/protobuf v1.22.0 h1:cJv5/xdbk1NnMPR1VP9+HU6gupuG9MLBoH1r6RHZ2M google.golang.org/protobuf v1.22.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.8 h1:obN1ZagJSUGI0Ek/LBmuj4SNLPfIny3KsKFopxRdj10= gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/session.go b/session.go index a621be8..6c55fc4 100644 --- a/session.go +++ b/session.go @@ -61,6 +61,8 @@ type Session struct { bytesReadSentTime time.Time bytesReadUpdateTime time.Time remoteBytesRead uint64 + + sendWindowPacketCount float64 // Equal to sendWindowsSize / sendMtu } type SendWithFunc func(localClientID, remoteClientID string, buf []byte, writeTimeout time.Duration) error @@ -72,26 +74,27 @@ func NewSession(localAddr, remoteAddr net.Addr, localClientIDs, remoteClientIDs } session := &Session{ - config: config, - localAddr: localAddr, - remoteAddr: remoteAddr, - localClientIDs: localClientIDs, - remoteClientIDs: remoteClientIDs, - sendWith: sendWith, - sendWindowSize: uint32(config.SessionWindowSize), - recvWindowSize: uint32(config.SessionWindowSize), - sendMtu: uint32(config.MTU), - recvMtu: uint32(config.MTU), - sendWindowStartSeq: MinSequenceID, - sendWindowEndSeq: MinSequenceID, - recvWindowStartSeq: MinSequenceID, - recvWindowUsed: 0, - bytesWrite: 0, - bytesRead: 0, - bytesReadSentTime: time.Now(), - bytesReadUpdateTime: time.Now(), - remoteBytesRead: 0, - onAccept: make(chan struct{}, 1), + config: config, + localAddr: localAddr, + remoteAddr: remoteAddr, + localClientIDs: localClientIDs, + remoteClientIDs: remoteClientIDs, + sendWith: sendWith, + sendWindowSize: uint32(config.SessionWindowSize), + recvWindowSize: uint32(config.SessionWindowSize), + sendMtu: uint32(config.MTU), + recvMtu: uint32(config.MTU), + sendWindowStartSeq: MinSequenceID, + sendWindowEndSeq: MinSequenceID, + recvWindowStartSeq: MinSequenceID, + recvWindowUsed: 0, + bytesWrite: 0, + bytesRead: 0, + bytesReadSentTime: time.Now(), + bytesReadUpdateTime: time.Now(), + remoteBytesRead: 0, + onAccept: make(chan struct{}, 1), + sendWindowPacketCount: float64(config.SessionWindowSize) / float64(config.MTU), } session.context, session.cancel = context.WithCancel(context.Background()) @@ -256,6 +259,8 @@ func (session *Session) ReceiveWith(localClientID, remoteClientID string, buf [] } } } + + session.updateConnWindowSize() } if isEstablished && packet.BytesRead > session.remoteBytesRead { @@ -519,6 +524,7 @@ func (session *Session) handleHandshakePacket(packet *pb.Packet) error { if packet.Mtu < session.sendMtu { session.sendMtu = packet.Mtu } + session.sendWindowPacketCount = float64(session.sendWindowSize) / float64(session.sendMtu) if len(packet.ClientIds) == 0 { return ErrInvalidPacket @@ -528,9 +534,10 @@ func (session *Session) handleHandshakePacket(packet *pb.Packet) error { n = len(packet.ClientIds) } + initialWindowSize := session.sendWindowPacketCount / float64(n) connections := make(map[string]*Connection, n) for i := 0; i < n; i++ { - conn, err := NewConnection(session, session.localClientIDs[i], packet.ClientIds[i]) + conn, err := NewConnection(session, session.localClientIDs[i], packet.ClientIds[i], initialWindowSize) if err != nil { return err } @@ -540,7 +547,7 @@ func (session *Session) handleHandshakePacket(packet *pb.Packet) error { session.remoteClientIDs = packet.ClientIds session.sendChan = make(chan uint32) - session.resendChan = make(chan uint32, session.config.MaxConnectionWindowSize*int32(n)) + session.resendChan = make(chan uint32, int(session.sendWindowPacketCount)+n) session.sendWindowUpdate = make(chan struct{}, 1) session.recvDataUpdate = make(chan struct{}, 1) session.sendBuffer = make([]byte, 0, session.sendMtu) @@ -943,3 +950,22 @@ func (session *Session) SetWriteDeadline(t time.Time) error { func (session *Session) SetLinger(t int32) { session.config.Linger = t } + +func (session *Session) updateConnWindowSize() { + totalSize := 0.0 + for _, conn := range session.connections { + conn.RLock() + totalSize += float64(conn.windowSize) + conn.RUnlock() + } + if totalSize <= 0 { + return + } + + for _, conn := range session.connections { + conn.Lock() + n := session.sendWindowPacketCount * (conn.windowSize / totalSize) + conn.setWindowSize(n) + conn.Unlock() + } +} diff --git a/test/exception_test.go b/test/exception_test.go new file mode 100644 index 0000000..d311865 --- /dev/null +++ b/test/exception_test.go @@ -0,0 +1,267 @@ +package test + +import ( + "fmt" + "testing" + "time" + + mockconn "github.com/nknorg/mockconn-go" +) + +// go test -v -run=TestCloseRemoteRead +func TestCloseRemoteRead(t *testing.T) { + if testResult == nil { + testResult = make([]*TestCase, 0) + } + + id++ + tc := &TestCase{id: id, name: "Two connections, the second one will close remote read in 2 seconds", bytesToSend: bytesToSend} + tc.mockConfigs = make(map[string]mockconn.ConnConfig) + baseConf.WriteTimeout = 3 * time.Second + baseConf.ReadTimeout = 3 * time.Second + tc.mockConfigs["0"] = baseConf + tc.mockConfigs["1"] = baseConf + tc.numClients = len(tc.mockConfigs) + + tc = closeRemoteRead(tc) + testResult = append(testResult, tc) + PrintResult(testResult) +} + +// go test -v -run=TestCloseRemoteWrite +func TestCloseRemoteWrite(t *testing.T) { + if testResult == nil { + testResult = make([]*TestCase, 0) + } + + id++ + tc := &TestCase{id: id, name: "Two connections, the second one will close write in 2 seconds", bytesToSend: bytesToSend} + tc.mockConfigs = make(map[string]mockconn.ConnConfig) + baseConf.WriteTimeout = 3 * time.Second + baseConf.ReadTimeout = 3 * time.Second + tc.mockConfigs["0"] = baseConf + tc.mockConfigs["1"] = baseConf + tc.numClients = len(tc.mockConfigs) + + tc = closeRemoteWrite(tc) + testResult = append(testResult, tc) + PrintResult(testResult) +} + +// go test -v -run=TestCloseLocalRead +func TestCloseLocalRead(t *testing.T) { + if testResult == nil { + testResult = make([]*TestCase, 0) + } + + id++ + tc := &TestCase{id: id, name: "Two connections, the second one will close remote read in 2 seconds", bytesToSend: bytesToSend} + tc.mockConfigs = make(map[string]mockconn.ConnConfig) + baseConf.WriteTimeout = 3 * time.Second + baseConf.ReadTimeout = 3 * time.Second + tc.mockConfigs["0"] = baseConf + tc.mockConfigs["1"] = baseConf + tc.numClients = len(tc.mockConfigs) + + tc = closeLocalRead(tc) + testResult = append(testResult, tc) + PrintResult(testResult) +} + +// go test -v -run=TestCloseLocalWrite +func TestCloseLocalWrite(t *testing.T) { + if testResult == nil { + testResult = make([]*TestCase, 0) + } + + id++ + tc := &TestCase{id: id, name: "Two connections, the second one will close write in 2 seconds", bytesToSend: bytesToSend} + tc.mockConfigs = make(map[string]mockconn.ConnConfig) + baseConf.WriteTimeout = 3 * time.Second + baseConf.ReadTimeout = 3 * time.Second + tc.mockConfigs["0"] = baseConf + tc.mockConfigs["1"] = baseConf + tc.numClients = len(tc.mockConfigs) + + tc = closeLocalWrite(tc) + testResult = append(testResult, tc) + PrintResult(testResult) +} + +func closeRemoteRead(tc *TestCase) *TestCase { + fmt.Printf("\n>>>>>> Case %v, %v\n", id, tc.name) + + var ts TestSession + ts.Create(tc.mockConfigs, tc.numClients) + ts.DialUp() + + writeChan := make(chan int64, 3) + readChan := make(chan int64, 3) + go ts.write(ts.localSess, tc.bytesToSend, writeChan) + go ts.read(ts.remoteSess, readChan) + + time.Sleep(2 * time.Second) + ts.CloseOneConnectionRead(ts.remoteSess, "1") + + bytesReceived := <-readChan + count := <-readChan + ms := <-readChan + + speed := float64(bytesReceived) / (1 << 20) / (float64(ms) / 1000.0) + throughput := float64(count) / (float64(ms) / 1000.0) + + fmt.Printf("\n%v received %v bytes at %.3f MB/s, throughput:%.1f packets/s, duration: %v ms \n", + ts.remoteSess.LocalAddr(), bytesReceived, speed, throughput, ms) + + tc.duration = ms + tc.speed = speed + + return tc +} + +// go test -v -run=TestCloseAndOpenRemoteRead +func TestCloseAndOpenRemoteRead(t *testing.T) { + if testResult == nil { + testResult = make([]*TestCase, 0) + } + + id++ + toSend := 80 << 20 + tc := &TestCase{id: id, name: "Two connections, the second one will close remote read in 2 seconds", bytesToSend: toSend} + tc.mockConfigs = make(map[string]mockconn.ConnConfig) + baseConf.WriteTimeout = 3 * time.Second + baseConf.ReadTimeout = 3 * time.Second + tc.mockConfigs["0"] = baseConf + tc.mockConfigs["1"] = baseConf + tc.numClients = len(tc.mockConfigs) + + tc = pauseAndResumeRemoteRead(tc) + testResult = append(testResult, tc) + PrintResult(testResult) +} + +func pauseAndResumeRemoteRead(tc *TestCase) *TestCase { + fmt.Printf("\n>>>>>> Case %v, %v\n", id, tc.name) + + var ts TestSession + ts.Create(tc.mockConfigs, tc.numClients) + ts.DialUp() + + writeChan := make(chan int64, 3) + readChan := make(chan int64, 3) + go ts.write(ts.localSess, tc.bytesToSend, writeChan) + go ts.read(ts.remoteSess, readChan) + + time.Sleep(5 * time.Second) + ts.PauseOneConnectionRead(ts.remoteSess, "1") + time.Sleep(1 * time.Second) + ts.ResumeOneConnectionRead(ts.remoteSess, "1") + + bytesReceived := <-readChan + count := <-readChan + ms := <-readChan + + speed := float64(bytesReceived) / (1 << 20) / (float64(ms) / 1000.0) + throughput := float64(count) / (float64(ms) / 1000.0) + + fmt.Printf("\n%v received %v bytes at %.3f MB/s, throughput:%.1f packets/s, duration: %v ms \n", + ts.remoteSess.LocalAddr(), bytesReceived, speed, throughput, ms) + + tc.duration = ms + tc.speed = speed + + return tc +} + +func closeLocalRead(tc *TestCase) *TestCase { + fmt.Printf("\n>>>>>> Case %v, %v\n", id, tc.name) + + var ts TestSession + ts.Create(tc.mockConfigs, tc.numClients) + ts.DialUp() + + writeChan := make(chan int64, 3) + readChan := make(chan int64, 3) + go ts.write(ts.localSess, tc.bytesToSend, writeChan) + go ts.read(ts.remoteSess, readChan) + + time.Sleep(2 * time.Second) + ts.CloseOneConnectionRead(ts.remoteSess, "1") + + bytesReceived := <-readChan + count := <-readChan + ms := <-readChan + + speed := float64(bytesReceived) / (1 << 20) / (float64(ms) / 1000.0) + throughput := float64(count) / (float64(ms) / 1000.0) + + fmt.Printf("\n%v received %v bytes at %.3f MB/s, throughput:%.1f packets/s, duration: %v ms \n", + ts.remoteSess.LocalAddr(), bytesReceived, speed, throughput, ms) + + tc.duration = ms + tc.speed = speed + + return tc +} + +func closeRemoteWrite(tc *TestCase) *TestCase { + fmt.Printf("\n>>>>>> Case %v, %v\n", id, tc.name) + + var ts TestSession + ts.Create(tc.mockConfigs, tc.numClients) + ts.DialUp() + + writeChan := make(chan int64, 3) + readChan := make(chan int64, 3) + go ts.write(ts.localSess, tc.bytesToSend, writeChan) + go ts.read(ts.remoteSess, readChan) + + time.Sleep(2 * time.Second) + ts.CloseOneConnectionWrite(ts.remoteSess, "1") + + bytesReceived := <-readChan + count := <-readChan + ms := <-readChan + + speed := float64(bytesReceived) / (1 << 20) / (float64(ms) / 1000.0) + throughput := float64(count) / (float64(ms) / 1000.0) + + fmt.Printf("\n%v received %v bytes at %.3f MB/s, throughput:%.1f packets/s, duration: %v ms \n", + ts.remoteSess.LocalAddr(), bytesReceived, speed, throughput, ms) + + tc.duration = ms + tc.speed = speed + + return tc +} + +func closeLocalWrite(tc *TestCase) *TestCase { + fmt.Printf("\n>>>>>> Case %v, %v\n", id, tc.name) + + var ts TestSession + ts.Create(tc.mockConfigs, tc.numClients) + ts.DialUp() + + writeChan := make(chan int64, 3) + readChan := make(chan int64, 3) + go ts.write(ts.localSess, tc.bytesToSend, writeChan) + go ts.read(ts.remoteSess, readChan) + + time.Sleep(2 * time.Second) + ts.CloseOneConnectionWrite(ts.localSess, "1") + + bytesReceived := <-readChan + count := <-readChan + ms := <-readChan + + speed := float64(bytesReceived) / (1 << 20) / (float64(ms) / 1000.0) + throughput := float64(count) / (float64(ms) / 1000.0) + + fmt.Printf("\n%v received %v bytes at %.3f MB/s, throughput:%.1f packets/s, duration: %v ms \n", + ts.remoteSess.LocalAddr(), bytesReceived, speed, throughput, ms) + + tc.duration = ms + tc.speed = speed + + return tc +} diff --git a/test/latency_test.go b/test/latency_test.go new file mode 100644 index 0000000..9a66517 --- /dev/null +++ b/test/latency_test.go @@ -0,0 +1,34 @@ +package test + +import ( + "fmt" + "testing" + + mockconn "github.com/nknorg/mockconn-go" +) + +// go test -v -run=TestLowAndHighLatency +func TestLowAndHighLatency(t *testing.T) { + if testResult == nil { + testResult = make([]*TestCase, 0) + } + + id++ + conf1 := baseConf + conf1.Latency = lowLatency + conf2 := baseConf + conf2.Latency = highLatency + toSend := 80 << 20 + + tc := &TestCase{id: id, name: fmt.Sprintf("Two clients same throughput %v packets/s, low latency %v, high latency %v", + conf1.Throughput, conf1.Latency, conf2.Latency), bytesToSend: toSend} + tc.mockConfigs = make(map[string]mockconn.ConnConfig) + tc.mockConfigs["0"] = conf1 + tc.mockConfigs["1"] = conf2 + tc.numClients = len(tc.mockConfigs) + + tc = run(tc, "low_and_high_latency") + testResult = append(testResult, tc) + + PrintResult(testResult) +} diff --git a/test/loss_test.go b/test/loss_test.go new file mode 100644 index 0000000..8abfbee --- /dev/null +++ b/test/loss_test.go @@ -0,0 +1,34 @@ +package test + +import ( + "fmt" + "testing" + + mockconn "github.com/nknorg/mockconn-go" +) + +// go test -v -run=TestLowAndHighLoss +func TestLowAndHighLoss(t *testing.T) { + if testResult == nil { + testResult = make([]*TestCase, 0) + } + + id++ + conf1, conf2 := baseConf, baseConf + conf1.Loss = 0.01 + conf2.Loss = 0.1 + toSend := 8 << 20 + + tc := &TestCase{id: id, name: fmt.Sprintf("Base case with two connections with low loss %v, high loss %v", + conf1.Loss, conf2.Loss), bytesToSend: toSend} + tc.mockConfigs = make(map[string]mockconn.ConnConfig) + tc.mockConfigs["0"] = baseConf + tc.mockConfigs["1"] = conf1 + tc.mockConfigs["2"] = conf2 + tc.numClients = len(tc.mockConfigs) + + tc = run(tc, "Base_and_low_and_high_loss") + testResult = append(testResult, tc) + + PrintResult(testResult) +} diff --git a/test/pub_test.go b/test/pub_test.go new file mode 100644 index 0000000..e5427e4 --- /dev/null +++ b/test/pub_test.go @@ -0,0 +1,75 @@ +package test + +import ( + "fmt" + "time" + + mockconn "github.com/nknorg/mockconn-go" +) + +type TestCase struct { + id int + name string + numClients int + bytesToSend int + mockConfigs map[string]mockconn.ConnConfig // map localClientID to mock config + duration int64 + speed float64 +} + +const ( + lowThroughput = 128 // packets / second + midThroughput = 1024 // packets / second + highThroughput = 2048 // packets / second + + lowLatency = 50 * time.Millisecond // ms + highLatency = 500 * time.Millisecond //ms + lowLoss = 0.01 // 1% loss + highLoss = 0.1 // 10% loss + + bytesToSend = 8 << 20 +) + +var baseConf = mockconn.ConnConfig{Addr1: "Alice", Addr2: "Bob", Throughput: midThroughput, Latency: lowLatency, + WriteTimeout: 3 * time.Second, ReadTimeout: 3 * time.Second} + +var testResult []*TestCase +var id = 0 + +func run(tc *TestCase, imgName string) *TestCase { + fmt.Printf("\n>>>>>> Case %v, %v\n", id, tc.name) + + var ts TestSession + ts.Create(tc.mockConfigs, tc.numClients) + ts.DialUp() + + writeChan := make(chan int64, 3) + readChan := make(chan int64, 3) + go ts.write(ts.localSess, tc.bytesToSend, writeChan) + go ts.read(ts.remoteSess, readChan) + + bytesReceived := <-readChan + count := <-readChan + ms := <-readChan + + speed := float64(bytesReceived) / (1 << 20) / (float64(ms) / 1000.0) + throughput := float64(count) / (float64(ms) / 1000.0) + + fmt.Printf("\n%v received %v bytes at %.3f MB/s, throughput:%.1f packets/s, duration: %v ms \n", + ts.remoteSess.LocalAddr(), bytesReceived, speed, throughput, ms) + + tc.duration = ms + tc.speed = speed + + return tc +} + +// Print test result to compare previous version and new version +func PrintResult(testResult []*TestCase) { + fmt.Printf("\nid \tconn \tspeed \t\tduration \ttest case description\n") + for _, tc := range testResult { + fmt.Printf("%v \t%v \t%.3f MB/s \t%v ms \t%v \n", + tc.id, tc.numClients, tc.speed, tc.duration, tc.name) + } + fmt.Println() +} diff --git a/test/testsession.go b/test/testsession.go new file mode 100644 index 0000000..d9cca64 --- /dev/null +++ b/test/testsession.go @@ -0,0 +1,288 @@ +package test + +import ( + "context" + "encoding/binary" + "errors" + "fmt" + "log" + "net" + "strconv" + "time" + + mockconn "github.com/nknorg/mockconn-go" + ncp "github.com/nknorg/ncp-go" +) + +type TestSession struct { + localSess *ncp.Session + remoteSess *ncp.Session + mockConfigs map[string]mockconn.ConnConfig // map clinetId to mockConfig + numClients int + + localConns map[string]net.Conn + remoteConns map[string]net.Conn + + closeChan chan struct{} // indicate this session is closed +} + +func (ts *TestSession) Create(confs map[string]mockconn.ConnConfig, numClients int) { + ts.mockConfigs = confs + + ts.localConns = make(map[string]net.Conn) + ts.remoteConns = make(map[string]net.Conn) + ts.closeChan = make(chan struct{}) + ts.numClients = numClients + + clientIDs := make([]string, 0) + for i := 0; i < numClients; i++ { + clientId := strconv.Itoa(i) + clientIDs = append(clientIDs, clientId) + + conf := confs[clientId] + localConn, remoteConn, err := mockconn.NewMockConn(&conf) + + if err == nil { + ts.localConns[clientId] = localConn + ts.remoteConns[clientId] = remoteConn + } else { + log.Fatalln("mockconn.NewMockConn err:", err) + } + } + + sessionConfig := &ncp.Config{} + localSess, _ := ncp.NewSession(mockconn.NewClientAddr("Alice"), mockconn.NewClientAddr("Bob"), clientIDs, clientIDs, + func(localClientID, remoteClientID string, buf []byte, writeTimeout time.Duration) (err error) { + netconn, ok := ts.localConns[localClientID] + if ok { + _, err = netconn.Write(buf) + } else { + err = errors.New("sendwith can't get connection") + } + return err + }, sessionConfig) + + remoteSess, _ := ncp.NewSession(mockconn.NewClientAddr("Bob"), mockconn.NewClientAddr("Alice"), clientIDs, clientIDs, + func(localClientID, remoteClientID string, buf []byte, writeTimeout time.Duration) (err error) { + conn, ok := ts.remoteConns[localClientID] + if ok { + _, err = conn.Write(buf) + } else { + err = errors.New("sendwith can't get connection") + } + return err + }, sessionConfig) + + ts.localSess = localSess + ts.remoteSess = remoteSess + + go func() { + for clientId, conn := range ts.localConns { + go ts.networkRead(ts.localSess, conn, clientId) + } + for clientId, conn := range ts.remoteConns { + go ts.networkRead(ts.remoteSess, conn, clientId) + } + }() +} + +func (ts *TestSession) networkRead(s *ncp.Session, conn net.Conn, clientId string) { + count := 0 + var zeroTime time.Time + var t1 time.Time + +loop: + for { + b := make([]byte, 1500) + n, err := conn.Read(b) + if err != nil { + time.Sleep(50 * time.Millisecond) + // fmt.Printf("testsession.networkRead session %v conn %v conn.Read error: %v\n", s.LocalAddr(), clientId, err) + continue + } + + if t1 == zeroTime { + t1 = time.Now() + } + + err = s.ReceiveWith(clientId, clientId, b[:n]) + if err != nil { + fmt.Printf("testsession.networkRead session %v conn %v s.ReceiveWith error: %v\n", s.LocalAddr(), clientId, err) + time.Sleep(10 * time.Millisecond) + } + count++ + + select { + case <-ts.closeChan: + break loop + default: + } + } + + fmt.Printf("TestSession.networkRead session %v conn %v exit read, read times: %v\n", s.LocalAddr().String(), clientId, count) +} + +func (ts *TestSession) DialUp() { + go func() { + for { + time.Sleep(200 * time.Millisecond) + err := ts.remoteSess.Accept() + if err == nil { + break + } + } + }() + + ctx := context.Background() + err := ts.localSess.Dial(ctx) + if err != nil { + fmt.Printf("ts.localSess.Dial error: %v\n", err) + return + } +} + +func (ts *TestSession) write(s *ncp.Session, numBytes int, writeChan chan int64) error { + b := make([]byte, 4) + binary.LittleEndian.PutUint32(b, uint32(numBytes)) + _, err := s.Write(b) + if err != nil { + return err + } + + var bytesSent, count int64 + t1 := time.Now() + for i := 0; i < numBytes/1024; i++ { + b := make([]byte, 1024) + n, err := s.Write(b) + if err != nil { + log.Fatal("testsession.write s.Write err ", err) + return err + } + if n != len(b) { + return fmt.Errorf("sent %d instead of %d bytes", n, len(b)) + } + bytesSent += int64(len(b)) + count++ + } + dur := time.Since(t1) + + writeChan <- bytesSent + writeChan <- count + writeChan <- dur.Milliseconds() + + return nil +} + +func (ts *TestSession) read(s *ncp.Session, readChan chan int64) error { + b := make([]byte, 4) + n := 0 + for { + m, err := s.Read(b[n:]) + if err != nil { + return err + } + n += m + if n >= 4 { + break + } + } + numBytes := int(binary.LittleEndian.Uint32(b)) + + b = make([]byte, 1024) + var bytesReceived, count int64 + t1 := time.Now() + for { + n, err := s.Read(b) + if err != nil { + log.Fatal("testsession.read s.Read err ", err) + return err + } + bytesReceived += int64(n) + count++ + + if bytesReceived == int64(numBytes) { + dur := time.Since(t1) + readChan <- bytesReceived + readChan <- count + readChan <- dur.Milliseconds() + return nil + } + } +} + +func (ts *TestSession) Close() { + for _, conn := range ts.localConns { + conn.Close() + } + for _, conn := range ts.remoteConns { + conn.Close() + } + ts.localSess.Close() + ts.remoteSess.Close() + + <-ts.closeChan + <-ts.closeChan +} + +func (ts *TestSession) CloseOneConnectionRead(sess *ncp.Session, clientId string) error { + var conn net.Conn + if sess == ts.remoteSess { + conn = ts.remoteConns[clientId] + } else { + conn = ts.localConns[clientId] + } + + nc := conn.(*mockconn.NetConn) + fmt.Printf("session %v conn %v is to close reading\n", sess.LocalAddr(), clientId) + return nc.CloseRead() +} + +func (ts *TestSession) PauseOneConnectionRead(sess *ncp.Session, clientId string) { + var conn net.Conn + if sess == ts.remoteSess { + conn = ts.remoteConns[clientId] + } else { + conn = ts.localConns[clientId] + } + + nc := conn.(*mockconn.NetConn) + nc.PauseRead() + fmt.Printf("session %v conn %v is to pause reading\n", sess.LocalAddr(), clientId) +} + +func (ts *TestSession) ResumeOneConnectionRead(sess *ncp.Session, clientId string) { + var conn net.Conn + if sess == ts.remoteSess { + conn = ts.remoteConns[clientId] + } else { + conn = ts.localConns[clientId] + } + + nc := conn.(*mockconn.NetConn) + nc.ResumeRead() + fmt.Printf("session %v conn %v is to resume reading\n", sess.LocalAddr(), clientId) +} + +func (ts *TestSession) OpenOneConnectionRead(sess *ncp.Session, clientId string) error { + var conn net.Conn + if sess == ts.remoteSess { + conn = ts.remoteConns[clientId] + } else { + conn = ts.localConns[clientId] + } + + nc := conn.(*mockconn.NetConn) + fmt.Printf("session %v conn %v is to open read\n", sess.LocalAddr(), clientId) + return nc.SetReadDeadline(time.Time{}) +} + +func (ts *TestSession) CloseOneConnectionWrite(sess *ncp.Session, clientId string) error { + var conn net.Conn + if sess == ts.remoteSess { + conn = ts.remoteConns[clientId] + } else { + conn = ts.localConns[clientId] + } + nc := conn.(*mockconn.NetConn) + return nc.CloseWrite() +} diff --git a/test/throughput_test.go b/test/throughput_test.go new file mode 100644 index 0000000..cff4de0 --- /dev/null +++ b/test/throughput_test.go @@ -0,0 +1,170 @@ +package test + +import ( + "fmt" + "testing" + + mockconn "github.com/nknorg/mockconn-go" +) + +// go test -v -run=TestBaseClient +func TestBaseClient(t *testing.T) { + if testResult == nil { + testResult = make([]*TestCase, 0) + } + + id++ + tc := &TestCase{id: id, name: fmt.Sprintf("Base case, one client throughput %v packets/s, latency %v, loss %v", + baseConf.Throughput, baseConf.Latency, baseConf.Loss), bytesToSend: bytesToSend} + tc.mockConfigs = make(map[string]mockconn.ConnConfig) + tc.mockConfigs["0"] = baseConf + tc.numClients = len(tc.mockConfigs) + + tc = run(tc, "base") + testResult = append(testResult, tc) + PrintResult(testResult) +} + +// go test -v -run=TestBaseAndLowTpHighLat +func TestBaseAndLowTpHighLat(t *testing.T) { + if testResult == nil { + testResult = make([]*TestCase, 0) + } + + id++ + lowTpHighLatConf := baseConf + lowTpHighLatConf.Throughput = lowThroughput + lowTpHighLatConf.Latency = highLatency + tc := &TestCase{id: id, name: fmt.Sprintf("Append 1 client throughput %v packets/s, latency %v, loss %v to base case", + lowTpHighLatConf.Throughput, lowTpHighLatConf.Latency, lowTpHighLatConf.Loss), bytesToSend: bytesToSend} + tc.mockConfigs = make(map[string]mockconn.ConnConfig) + tc.mockConfigs["0"] = baseConf + tc.mockConfigs["1"] = lowTpHighLatConf + tc.numClients = len(tc.mockConfigs) + + tc = run(tc, "base_and_1_low_tp") + testResult = append(testResult, tc) + PrintResult(testResult) +} + +// go test -v -run=TestBaseAnd2LowTpHighLat +func TestBaseAnd2LowTpHighLat(t *testing.T) { + if testResult == nil { + testResult = make([]*TestCase, 0) + } + + id++ + lowTpHighLatConf := baseConf + lowTpHighLatConf.Throughput = lowThroughput + lowTpHighLatConf.Latency = highLatency + tc := &TestCase{id: id, name: fmt.Sprintf("Append 2 clients throughput %v packets/s, latency %v, loss %v to base case", + lowTpHighLatConf.Throughput, lowTpHighLatConf.Latency, lowTpHighLatConf.Loss), bytesToSend: bytesToSend} + tc.mockConfigs = make(map[string]mockconn.ConnConfig) + tc.mockConfigs["0"] = baseConf + tc.mockConfigs["1"] = lowTpHighLatConf + tc.mockConfigs["2"] = lowTpHighLatConf + tc.numClients = len(tc.mockConfigs) + + tc = run(tc, "base_and_2_low_tp") + testResult = append(testResult, tc) + PrintResult(testResult) +} + +// go test -v -run=TestBaseAnd3LowTpHighLat +func TestBaseAnd3LowTpHighLat(t *testing.T) { + if testResult == nil { + testResult = make([]*TestCase, 0) + } + + id++ + lowTpHighLatConf := baseConf + lowTpHighLatConf.Throughput = lowThroughput + lowTpHighLatConf.Latency = highLatency + tc := &TestCase{id: id, name: fmt.Sprintf("Append 3 clients throughput %v packets/s, latency %v, loss %v to base case", + lowTpHighLatConf.Throughput, lowTpHighLatConf.Latency, lowTpHighLatConf.Loss), bytesToSend: bytesToSend} + tc.mockConfigs = make(map[string]mockconn.ConnConfig) + tc.mockConfigs["0"] = baseConf + tc.mockConfigs["1"] = lowTpHighLatConf + tc.mockConfigs["2"] = lowTpHighLatConf + tc.mockConfigs["3"] = lowTpHighLatConf + tc.numClients = len(tc.mockConfigs) + + tc = run(tc, "base_and_3_low_tp") + testResult = append(testResult, tc) + PrintResult(testResult) +} + +// go test -v -run=TestBaseAnd3LowTpHighLat +func TestBaseAnd4LowTpHighLat(t *testing.T) { + if testResult == nil { + testResult = make([]*TestCase, 0) + } + + id++ + lowTpHighLatConf := baseConf + lowTpHighLatConf.Throughput = lowThroughput + lowTpHighLatConf.Latency = highLatency + tc := &TestCase{id: id, name: fmt.Sprintf("Append 4 clients throughput %v packets/s, latency %v, loss %v to base case", + lowTpHighLatConf.Throughput, lowTpHighLatConf.Latency, lowTpHighLatConf.Loss), bytesToSend: bytesToSend} + tc.mockConfigs = make(map[string]mockconn.ConnConfig) + tc.mockConfigs["0"] = baseConf + tc.mockConfigs["1"] = lowTpHighLatConf + tc.mockConfigs["2"] = lowTpHighLatConf + tc.mockConfigs["3"] = lowTpHighLatConf + tc.mockConfigs["4"] = lowTpHighLatConf + tc.numClients = len(tc.mockConfigs) + + tc = run(tc, "base_and_4_low_tp") + testResult = append(testResult, tc) + PrintResult(testResult) +} + +// go test -v -run=TestBaseAnd3DifferTp +func TestBaseAnd3DifferTp(t *testing.T) { + if testResult == nil { + testResult = make([]*TestCase, 0) + } + + id++ + low1, low2, low3 := baseConf, baseConf, baseConf + low1.Throughput = baseConf.Throughput / 2 + low2.Throughput = baseConf.Throughput / 4 + low3.Throughput = baseConf.Throughput / 8 + + tc := &TestCase{id: id, name: fmt.Sprintf("Append 3 different throughput %v,%v,%v packets/s, latency %v, loss %v to base case", + low1.Throughput, low2.Throughput, low3.Throughput, baseConf.Latency, baseConf.Loss), bytesToSend: bytesToSend} + tc.mockConfigs = make(map[string]mockconn.ConnConfig) + tc.mockConfigs["0"] = baseConf + tc.mockConfigs["1"] = low1 + tc.mockConfigs["2"] = low2 + tc.mockConfigs["3"] = low3 + tc.numClients = len(tc.mockConfigs) + + tc = run(tc, "base_and_3_diff_tp") + testResult = append(testResult, tc) + PrintResult(testResult) +} + +// go test -v -run=TestTpTrendFromAvg +func TestTpTrendFromAvg(t *testing.T) { + if testResult == nil { + testResult = make([]*TestCase, 0) + } + + id++ + conf1, conf2 := baseConf, baseConf + conf1.Throughput = 1024 + conf2.Throughput = 512 + toSend := 80 << 20 + + tc := &TestCase{id: id, name: fmt.Sprintf("Two connections, throughputs are %v,%v packets/s, latency %v, loss %v to base case", + conf1.Throughput, conf2.Throughput, baseConf.Latency, baseConf.Loss), bytesToSend: toSend} + tc.mockConfigs = make(map[string]mockconn.ConnConfig) + tc.mockConfigs["0"] = conf1 + tc.mockConfigs["1"] = conf2 + tc.numClients = len(tc.mockConfigs) + + tc = run(tc, "tp_trend_from_avg") + testResult = append(testResult, tc) + PrintResult(testResult) +}