diff --git a/client.go b/client.go index c3b1d32..2080847 100644 --- a/client.go +++ b/client.go @@ -21,7 +21,6 @@ import ( "github.com/nknorg/nkngomobile" "github.com/nknorg/tuna" tpb "github.com/nknorg/tuna/pb" - "github.com/nknorg/tuna/udp" gocache "github.com/patrickmn/go-cache" "google.golang.org/protobuf/proto" ) @@ -48,7 +47,7 @@ type TunaSessionClient struct { onConnect chan struct{} onClose chan struct{} connectedOnce sync.Once - udpConn *udp.EncryptUDPConn + udpConn *tuna.EncryptUDPConn sync.RWMutex listeners []net.Listener @@ -849,7 +848,7 @@ func (c *TunaSessionClient) startExits() error { return nil } -func (c *TunaSessionClient) ListenUDP(addrsRe *nkngomobile.StringArray) (*udp.EncryptUDPConn, error) { +func (c *TunaSessionClient) ListenUDP(addrsRe *nkngomobile.StringArray) (*tuna.EncryptUDPConn, error) { acceptAddrs, err := getAcceptAddrs(addrsRe) if err != nil { return nil, err @@ -887,15 +886,15 @@ func (c *TunaSessionClient) ListenUDP(addrsRe *nkngomobile.StringArray) (*udp.En return nil, err } - c.udpConn = udp.NewEncryptUDPConn(conn) + c.udpConn = tuna.NewEncryptUDPConn(conn) return c.udpConn, nil } -func (c *TunaSessionClient) DialUDP(remoteAddr string) (*udp.EncryptUDPConn, error) { +func (c *TunaSessionClient) DialUDP(remoteAddr string) (*tuna.EncryptUDPConn, error) { return c.DialUDPWithConfig(remoteAddr, nil) } -func (c *TunaSessionClient) DialUDPWithConfig(remoteAddr string, config *nkn.DialConfig) (*udp.EncryptUDPConn, error) { +func (c *TunaSessionClient) DialUDPWithConfig(remoteAddr string, config *nkn.DialConfig) (*tuna.EncryptUDPConn, error) { config, err := nkn.MergeDialConfig(c.config.SessionConfig, config) if err != nil { return nil, err @@ -919,7 +918,7 @@ func (c *TunaSessionClient) DialUDPWithConfig(remoteAddr string, config *nkn.Dia return nil, err } - udpConn := new(udp.EncryptUDPConn) + udpConn := new(tuna.EncryptUDPConn) for i, addr := range pubAddrs.Addrs { if len(addr.IP) > 0 && addr.Port > 0 { udpAddr := net.UDPAddr{IP: net.ParseIP(addr.IP), Port: int(addr.Port)} @@ -931,7 +930,7 @@ func (c *TunaSessionClient) DialUDPWithConfig(remoteAddr string, config *nkn.Dia log.Printf("dial udp err: %v", err) continue } - udpConn = udp.NewEncryptUDPConn(conn) + udpConn = tuna.NewEncryptUDPConn(conn) break } } diff --git a/examples/throughput/main.go b/examples/throughput/main.go index aa7ef87..31062ac 100644 --- a/examples/throughput/main.go +++ b/examples/throughput/main.go @@ -1,10 +1,12 @@ package main import ( + "crypto/rand" "encoding/binary" "encoding/hex" "flag" "fmt" + "github.com/nknorg/tuna" "log" "math" "net" @@ -12,8 +14,8 @@ import ( "strings" "time" - ncp "github.com/nknorg/ncp-go" - nkn "github.com/nknorg/nkn-sdk-go" + "github.com/nknorg/ncp-go" + "github.com/nknorg/nkn-sdk-go" ts "github.com/nknorg/nkn-tuna-session" "github.com/nknorg/tuna/geo" ) @@ -23,6 +25,9 @@ const ( listenID = "bob" ) +var udpBytesReceived = 0 +var udpBytesSend = 0 + func read(sess net.Conn) error { timeStart := time.Now() @@ -95,6 +100,64 @@ func write(sess net.Conn, numBytes int) error { return nil } +func readUDP(conn *tuna.EncryptUDPConn, numBytes int) error { + defer conn.Close() + buffer := make([]byte, 1024) + var timeStart time.Time + for { + err := conn.SetReadDeadline(time.Now().Add(10 * time.Second)) + if err != nil { + return err + } + n, _, err := conn.ReadFromUDP(buffer) + if udpBytesReceived == 0 { + timeStart = time.Now() + } + if err != nil { + return err + } + udpBytesReceived += n + if ((udpBytesReceived - n) * 10 / numBytes) != (udpBytesReceived * 10 / numBytes) { + mbTobytes := math.Pow(2, 20) + sent := float64(udpBytesSend) / mbTobytes + received := float64(udpBytesReceived) / mbTobytes + timeEnd := time.Now() + elapsed := timeEnd.Sub(timeStart) + speed := received / elapsed.Seconds() + log.Printf("UDP: Received %.2f MB bytes, speed: %.2f MB/s, package loss: %.2f%% \n", received, speed, 100*(1-received/sent)) + } + } +} + +func writeUDP(conn *tuna.EncryptUDPConn, numBytes int) error { + defer conn.Close() + buffer := make([]byte, 1024) + timeStart := time.Now() + for { + rand.Read(buffer) + n, _, err := conn.WriteMsgUDP(buffer, nil, nil) + if err != nil { + return err + } + time.Sleep(100 * time.Nanosecond) + udpBytesSend += n + if udpBytesSend > numBytes { + break + } + if ((udpBytesSend - n) * 10 / numBytes) != (udpBytesSend * 10 / numBytes) { + mbTobytes := math.Pow(2, 20) + sent := float64(udpBytesSend) / mbTobytes + received := float64(udpBytesReceived) / mbTobytes + timeEnd := time.Now() + elapsed := timeEnd.Sub(timeStart) + speed := sent / elapsed.Seconds() + log.Printf("UDP: Sent %.2f MB bytes, speed: %.2f MB/s, package loss: %.2f%% \n", sent, speed, 100*(1-received/sent)) + } + } + + return nil +} + func main() { numTunaListeners := flag.Int("n", 1, "number of tuna listeners") numBytes := flag.Int("m", 1, "data to send (MB)") @@ -109,6 +172,7 @@ func main() { tunaMeasureBandwidth := flag.Bool("tmb", false, "tuna measure bandwidth") tunaMaxPrice := flag.String("price", "0.01", "tuna reverse service max price in unit of NKN/MB") mtu := flag.Int("mtu", 0, "ncp session mtu") + u := flag.Bool("u", false, "send data through UDP instead TCP") flag.Parse() @@ -162,32 +226,43 @@ func main() { log.Fatal(err) } - err = c.Listen(nil) - if err != nil { - log.Fatal(err) - } - - <-c.OnConnect() - - log.Println("Listening at", c.Addr()) - - go func() { - for { - s, err := c.Accept() + if *u { + uConn, err := c.ListenUDP(nil) + if err != nil { + log.Fatal(err) + } + log.Println("Listening UDP at", c.Addr()) + go func() { + err = readUDP(uConn, *numBytes) if err != nil { log.Fatal(err) } - log.Println(c.Addr(), "accepted a session") - - go func(s net.Conn) { - err := read(s) + }() + } else { + err = c.Listen(nil) + if err != nil { + log.Fatal(err) + } + log.Println("Listening at", c.Addr()) + go func() { + for { + s, err := c.Accept() if err != nil { log.Fatal(err) } - s.Close() - }(s) - } - }() + log.Println(c.Addr(), "accepted a session") + + go func(s net.Conn) { + err := read(s) + if err != nil { + log.Fatal(err) + } + s.Close() + }(s) + } + }() + } + <-c.OnConnect() } if *dial { @@ -208,24 +283,38 @@ func main() { *dialAddr = listenID + "." + strings.SplitN(c.Addr().String(), ".", 2)[1] } - s, err := c.DialWithConfig(*dialAddr, dialConfig) - if err != nil { - log.Fatal(err) - } - log.Println(c.Addr(), "dialed a session") - - go func() { - err := write(s, *numBytes) + if *u { + udpConn, err := c.DialUDPWithConfig(*dialAddr, dialConfig) if err != nil { log.Fatal(err) } - for { - if s.IsClosed() { - os.Exit(0) + log.Println(c.Addr(), "dialed a UDP session") + go func() { + err := writeUDP(udpConn, *numBytes) + if err != nil { + log.Fatal(err) } - time.Sleep(time.Millisecond * 100) + }() + } else { + s, err := c.DialWithConfig(*dialAddr, dialConfig) + if err != nil { + log.Fatal(err) } - }() + log.Println(c.Addr(), "dialed a session") + + go func() { + err := write(s, *numBytes) + if err != nil { + log.Fatal(err) + } + for { + if s.IsClosed() { + os.Exit(0) + } + time.Sleep(time.Millisecond * 100) + } + }() + } } select {} diff --git a/go.mod b/go.mod index 24a9c2d..7c62583 100644 --- a/go.mod +++ b/go.mod @@ -8,7 +8,7 @@ require ( github.com/nknorg/nkn-sdk-go v1.4.3 github.com/nknorg/nkn/v2 v2.1.8 github.com/nknorg/nkngomobile v0.0.0-20220615081414-671ad1afdfa9 - github.com/nknorg/tuna v0.0.0-20230224012633-b53aa9c90cd5 + github.com/nknorg/tuna v0.0.0-20230307074911-ced36707f273 github.com/patrickmn/go-cache v2.1.0+incompatible golang.org/x/crypto v0.0.0-20220525230936-793ad666bf5e google.golang.org/protobuf v1.27.1 diff --git a/go.sum b/go.sum index ad03f25..83a34c0 100644 --- a/go.sum +++ b/go.sum @@ -255,8 +255,8 @@ github.com/nknorg/nkngomobile v0.0.0-20220615081414-671ad1afdfa9 h1:Gr37j7Ttvcn8 github.com/nknorg/nkngomobile v0.0.0-20220615081414-671ad1afdfa9/go.mod h1:zNY9NCyBcJCCDrXhwOjKarkW5cngPs/Z82xVNy/wvEA= github.com/nknorg/nnet v0.0.0-20220217113219-4d27780990b1/go.mod h1:4DHEQEMhlRGIKGSyhATdjeusdqaHafDatadtpeHBpvI= github.com/nknorg/portmapper v0.0.0-20200114081049-1c03cdccc283/go.mod h1:dL4PQJ4670oTO6LqvkjrBQEkD+iMiOYjlKRBBw55Csg= -github.com/nknorg/tuna v0.0.0-20230224012633-b53aa9c90cd5 h1:b6uQxUCJ3Xtq+YO8asJYzGKdP1ZvtCB7qKqt178blC0= -github.com/nknorg/tuna v0.0.0-20230224012633-b53aa9c90cd5/go.mod h1:HtzD/h1ARuyCKoM9d28HWHsel0M4e5UNuk4j9928i9s= +github.com/nknorg/tuna v0.0.0-20230307074911-ced36707f273 h1:pE6Tq5iq/Il5ZEYu/HZ9mNtHYqKLYS2FmyZfP0LkMxs= +github.com/nknorg/tuna v0.0.0-20230307074911-ced36707f273/go.mod h1:HtzD/h1ARuyCKoM9d28HWHsel0M4e5UNuk4j9928i9s= github.com/nrdcg/auroradns v1.0.1/go.mod h1:y4pc0i9QXYlFCWrhWrUSIETnZgrf4KuwjDIWmmXo3JI= github.com/nrdcg/dnspod-go v0.4.0/go.mod h1:vZSoFSFeQVm2gWLMkyX61LZ8HI3BaqtHZWgPTGKr6KQ= github.com/nrdcg/goinwx v0.7.0/go.mod h1:4tKJOCi/1lTxuw9/yB2Ez0aojwtUCSkckjc22eALpqE= diff --git a/tests/session_test.go b/tests/session_test.go index 60d6062..6d7163f 100644 --- a/tests/session_test.go +++ b/tests/session_test.go @@ -3,17 +3,19 @@ package tests import ( "bytes" "crypto/rand" + "encoding/hex" "errors" "github.com/nknorg/ncp-go" "github.com/nknorg/nkn-sdk-go" ts "github.com/nknorg/nkn-tuna-session" "github.com/nknorg/nkn/v2/crypto" + "github.com/nknorg/tuna" _ "github.com/nknorg/tuna/tests" - "github.com/nknorg/tuna/udp" "io" "net" "os" "strings" + "sync" "testing" "time" ) @@ -102,20 +104,6 @@ func TestTunaSession(t *testing.T) { } }() - go func() { - buffer := make([]byte, 65536) - for { - n, addr, err := uConn.ReadFromUDP(buffer) - if err != nil { - t.Fatal(err) - } - n, _, err = uConn.WriteMsgUDP(buffer[:n], nil, addr) - if err != nil { - t.Fatal(err) - } - } - }() - mm, err := nkn.NewMultiClient(account2, dialID, 4, false, clientConfig) if err != nil { t.Fatal(err) @@ -148,7 +136,7 @@ func TestTunaSession(t *testing.T) { t.Fatal(err) } - err = testUDP(udpConn) + err = testUDP(udpConn, uConn) if err != nil { t.Fatal(err) } @@ -169,16 +157,56 @@ func testTCP(conn net.Conn) error { return nil } -func testUDP(conn *udp.EncryptUDPConn) error { - send := make([]byte, 4096) - receive := make([]byte, 4096) - for i := 0; i < 10; i++ { - rand.Read(send) - conn.WriteMsgUDP(send, nil, nil) - conn.ReadFromUDP(receive) - if !bytes.Equal(send, receive) { - return errors.New("bytes not equal") +func testUDP(from, to *tuna.EncryptUDPConn) error { + count := 1000 + sendList := make([]string, count) + recvList := make([]string, count) + sendNum := 0 + recvNum := 0 + var wg sync.WaitGroup + var e error + go func() { + wg.Add(1) + receive := make([]byte, 1024) + for i := 0; i < count; i++ { + _, _, err := to.ReadFromUDP(receive) + if err != nil { + e = err + return + } + recvNum++ + recvList = append(recvList, hex.EncodeToString(receive)) + } + wg.Done() + }() + + go func() { + time.Sleep(1 * time.Second) + send := make([]byte, 1024) + wg.Add(1) + for i := 0; i < count; i++ { + rand.Read(send) + _, _, err := from.WriteMsgUDP(send, nil, nil) + if err != nil { + e = err + return + } + sendNum++ + sendList = append(sendList, hex.EncodeToString(send)) } + wg.Done() + }() + + wg.Wait() + if sendNum != recvNum { + return errors.New("package lost") } - return nil + + for i := 0; i < sendNum; i++ { + if sendList[i] != recvList[i] { + return errors.New("data mismatch") + } + } + + return e }