Skip to content

Commit

Permalink
Support create multi tunnels by one multi-client
Browse files Browse the repository at this point in the history
Signed-off-by: bill fort <fxbao@hotmail.com>
  • Loading branch information
billfort committed Jun 19, 2023
1 parent 4b6e570 commit b09ca88
Show file tree
Hide file tree
Showing 6 changed files with 306 additions and 192 deletions.
13 changes: 6 additions & 7 deletions tests/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,13 @@ const (
bytesToSend = 1024
bufLen = 100
numClients = 1
listenerId = "Bob"
dialerId = "Alice"
seedHex = "e68e046d13dd911594576ba0f4a196e9666790dc492071ad9ea5972c0b940435"
remoteAddr = "Bob.be285ff9330122cea44487a9618f96603fde6d37d5909ae1c271616772c349fe"

toAddrTcp = "127.0.0.1:54321"
fromAddrTcp = "127.0.0.1:12345"

toAddrUdp = "127.0.0.1:54321"
fromAddrUdp = "127.0.0.1:12346"
listenerId = "Bob1"
toPort = "127.0.0.1:54321"
)

var fromPorts = []string{"127.0.0.1:12345"}
var fromUDPPorts = []string{"127.0.0.1:22345"}
var remoteAddrs = []string{"Bob1.be285ff9330122cea44487a9618f96603fde6d37d5909ae1c271616772c349fe"}
91 changes: 80 additions & 11 deletions tests/pub.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"encoding/hex"
"fmt"
"log"
"strings"
"time"

nkn "github.com/nknorg/nkn-sdk-go"
ts "github.com/nknorg/nkn-tuna-session"
Expand All @@ -24,20 +26,20 @@ const (
tunnelClientIsReady = "tunnel client is ready"
tcpServerIsReady = "tcp server is ready"
udpServerIsReady = "udp server is ready"
tcpDialerExit = "tcp dialer exit"
exit = "exit"
tcpServerExit = "tcp server exit"
udpServerExit = "udp server exit"
udpClientExit = "udp client exit"
)

var ch chan string = make(chan string, 4)
var ch chan string

func waitFor(ch chan string, status string) {
fmt.Println("waiting for ", status)
fmt.Println("Waiting for:", status)
for {
str := <-ch
fmt.Println("waitFor got: ", str)
if status == str {
fmt.Println("Got:", str)
if strings.Contains(str, status) {
break
}
}
Expand Down Expand Up @@ -104,12 +106,9 @@ func CreateTunaSession(account *nkn.Account, wallet *nkn.Wallet, mc *nkn.MultiCl
return
}

var node *types.Node
var tunaNode *types.Node

func CreateTunnelConfig(udp bool) *tunnel.Config {
if node == nil {
node = StartTunaNode()
}
config := &tunnel.Config{
NumSubClients: numClients,
ClientConfig: CreateClientConfig(3),
Expand All @@ -118,7 +117,7 @@ func CreateTunnelConfig(udp bool) *tunnel.Config {
TunaSessionConfig: CreateTunaSessionConfig(numClients),
Verbose: true,
UDP: udp,
TunaNode: node,
TunaNode: tunaNode,
}

return config
Expand Down Expand Up @@ -151,6 +150,7 @@ func StartTunaNode() *types.Node {
func runReverseEntry(seed []byte) error {
entryAccount, err := vault.NewAccountWithSeed(seed)
if err != nil {
fmt.Println("runReverseEntry vault.NewAccountWithSeed err ", err)
return err
}
seedRPCServerAddr := nkn.NewStringArray(nkn.DefaultSeedRPCServerAddr...)
Expand All @@ -160,19 +160,88 @@ func runReverseEntry(seed []byte) error {
}
entryWallet, err := nkn.NewWallet(&nkn.Account{Account: entryAccount}, walletConfig)
if err != nil {
fmt.Println("runReverseEntry nkn.NewWallet err ", err)
return err
}
entryConfig := new(tuna.EntryConfiguration)
err = util.ReadJSON("config.reverse.entry.json", entryConfig)
if err != nil {
fmt.Println("runReverseEntry util.ReadJSON err ", err)
return err
}
err = tuna.StartReverse(entryConfig, entryWallet)
if err != nil {
fmt.Println("runReverseEntry tuna.StartReverse err ", err)
return err
}

ch <- tunaNodeStarted
return nil
}

func StartTunnelListeners(tuna bool) error {
acc, _, err := CreateAccountAndWallet(seedHex)
if err != nil {
return err
}

config := CreateTunnelConfig(tuna)

tunnels, err := tunnel.NewTunnels(acc, listenerId, []string{"nkn"}, []string{toPort}, tuna, config)
if err != nil {
return err
}
time.Sleep(10 * time.Second) // wait for tuna node is ready
if tuna {
for _, t := range tunnels {
ts := t.TunaSessionClient()
<-ts.OnConnect()
ch <- tunaSessionConnected
}
}
ch <- tunnelServerIsReady
fmt.Printf("tunnel server is ready, toPort is %v\n", toPort)

for _, t := range tunnels {
err = t.Start()
if err != nil {
return err
}
}

return nil
}

func StartTunnelDialers(tcp, tuna bool) error {
acc, _, err := CreateAccountAndWallet(seedHex)
if err != nil {
return err
}

config := CreateTunnelConfig(tuna)
var from []string
if tcp {
from = fromPorts
} else {
from = fromUDPPorts
}

tunnels, err := tunnel.NewTunnels(acc, dialerId, from, remoteAddrs, tuna, config)
if err != nil {
return err
}

for _, t := range tunnels {
go func(t *tunnel.Tunnel) {
err := t.Start()
if err != nil {
fmt.Printf("tunnel.Start err: %v\n", err)
return
}
}(t)
}

select {}
time.Sleep(5 * time.Second) // Tunnel start time
ch <- tunnelClientIsReady
return nil
}
138 changes: 63 additions & 75 deletions tests/tcp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,21 @@ package tests
import (
"fmt"
"net"
"os"
"strings"
"sync"
"testing"
"time"

tunnel "github.com/nknorg/nkn-tunnel"
)

// go test -v -run=TestTCPWriteReadData
func TestTCPWriteReadData(t *testing.T) {
// go test -v -run=TestTCP
func TestTCP(t *testing.T) {
ch = make(chan string, 4)
if tunaNode == nil {
tunaNode = StartTunaNode()
waitFor(ch, tunaNodeStarted)
}

go func() {
err := StartTcpServer()
if err != nil {
Expand All @@ -22,18 +28,19 @@ func TestTCPWriteReadData(t *testing.T) {
waitFor(ch, tcpServerIsReady)

tuna := true

go func() {
err := StartTunnelListener(toAddrTcp, tuna)
err := StartTunnelListeners(tuna)
if err != nil {
fmt.Printf("StartTunnelListener err: %v\n", err)
return
fmt.Printf("StartTunnelListeners err: %v\n", err)
os.Exit(-1)
}
}()

waitFor(ch, tunnelServerIsReady)

go func() {
err := StartTunnelDialer(fromAddrTcp, tuna)
err := StartTunnelDialers(true, tuna)
if err != nil {
fmt.Printf("StartTunnelDialer err: %v\n", err)
return
Expand All @@ -42,64 +49,18 @@ func TestTCPWriteReadData(t *testing.T) {

waitFor(ch, tunnelClientIsReady)

go StartTcpDialer()
go StartTcpDialers()

waitFor(ch, tcpServerExit)
}

func StartTunnelListener(toAddr string, tuna bool) error {
acc, _, err := CreateAccountAndWallet(seedHex)
if err != nil {
return err
}

config := CreateTunnelConfig(true)
tun, err := tunnel.NewTunnel(acc, listenerId, "nkn", toAddr, tuna, config)
if err != nil {
return err
}
time.Sleep(10 * time.Second) // wait for tuna node is ready

ts := tun.TunaSessionClient()
<-ts.OnConnect()
ch <- tunaSessionConnected
ch <- tunnelServerIsReady
fmt.Printf("tunnel server is ready, toAddr is %v\n", toAddr)

err = tun.Start()
if err != nil {
return err
}
return nil
}

func StartTunnelDialer(fromAddr string, tuna bool) error {
acc, _, err := CreateAccountAndWallet(seedHex)
if err != nil {
return err
}

config := CreateTunnelConfig(true)
tun, err := tunnel.NewTunnel(acc, listenerId, fromAddr, remoteAddr, tuna, config)
if err != nil {
return err
}

ch <- tunnelClientIsReady

err = tun.Start()
if err != nil {
return err
}
return nil
close(ch)
}

func StartTcpServer() error {
listener, err := net.Listen("tcp", toAddrTcp)
listener, err := net.Listen("tcp", toPort)
if err != nil {
return err
}
fmt.Printf("tcp server is listening at %v\n", toAddrTcp)
fmt.Printf("StartTcpServer is listening at %v\n", toPort)
ch <- tcpServerIsReady

conn, err := listener.Accept()
Expand All @@ -111,33 +72,60 @@ func StartTcpServer() error {
for {
n, err := conn.Read(b)
if err != nil {
fmt.Printf("StartTcpServer conn.Read err %v\n", err)
return err
}

fmt.Printf("tcp server read: %v\n", string(b[:n]))
if strings.Contains(string(b[:n]), tcpDialerExit) {
fmt.Printf("TCP Server got: %v\n", string(b[:n]))
if strings.Contains(string(b[:n]), exit) {
break
}
// echo
_, err = conn.Write(b[:n])
if err != nil {
fmt.Printf("StartTcpServer conn.Write err %v\n", err)
return err
}
}
ch <- tcpServerExit
return nil
}

func StartTcpDialer() error {
conn, err := net.Dial("tcp", fromAddrTcp)
if err != nil {
return err
func StartTcpDialers() error {
var wg sync.WaitGroup
for i, fromPort := range fromPorts {
wg.Add(1)
go func(clientNum int, from string) {
defer wg.Done()
conn, err := net.Dial("tcp", from)
if err != nil {
fmt.Printf("StartTcpDialers net.Dial to %v err %v\n", from, err)
return
}

for i := 0; i < 10; i++ {
msg := fmt.Sprintf("tcp client %v data %v", clientNum, i)
_, err := conn.Write([]byte(msg))
if err != nil {
fmt.Printf("StartTcpDialers conn.Write to %v err %v\n", from, err)
return
}
b := make([]byte, 1024)
n, err := conn.Read(b)
if err != nil {
fmt.Printf("StartTcpDialers conn.Read to %v err %v\n", from, err)
return
}
if string(b[:n]) != msg {
fmt.Printf("StartTcpDialers get echo %v, it should be %v\n", string(b[:n]), msg)
return
}
fmt.Printf("TCP Client %v got echo: %v\n", clientNum, string(b[:n]))
}
conn.Write([]byte(exit))
time.Sleep(2 * time.Second) // wait for tcp server get it
}(i, fromPort)
}

for i := 0; i < 10; i++ {
_, err := conn.Write([]byte(fmt.Sprintf("tcp client data %v\n", i)))
if err != nil {
return err
}
}
conn.Write([]byte(tcpDialerExit))
time.Sleep(2 * time.Second) // wait for tcp server get it

ch <- tcpDialerExit
wg.Wait()
return nil
}
Loading

0 comments on commit b09ca88

Please sign in to comment.