Skip to content

Commit

Permalink
Cache public addresses for re-dialing
Browse files Browse the repository at this point in the history
Signed-off-by: bill fort <fxbao@hotmail.com>
  • Loading branch information
billfort committed Mar 28, 2023
1 parent 9a596c5 commit 9700d11
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 23 deletions.
61 changes: 60 additions & 1 deletion client.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ type TunaSessionClient struct {
connCount map[string]int
closedSessionKey *gocache.Cache
isClosed bool
pubAddrs map[string]*PubAddrs // cached pub addrs, map key is remote address.

tunaNode *types.Node

Expand All @@ -91,6 +92,7 @@ func NewTunaSessionClient(clientAccount *nkn.Account, m *nkn.MultiClient, wallet
sharedKeys: make(map[string]*[sharedKeySize]byte),
connCount: make(map[string]int),
closedSessionKey: gocache.New(closedSessionKeyExpiration, closedSessionKeyCleanupInterval),
pubAddrs: make(map[string]*PubAddrs),
}

go c.removeClosedSessions()
Expand Down Expand Up @@ -248,6 +250,10 @@ func (c *TunaSessionClient) shouldAcceptAddr(addr string) bool {
}

func (c *TunaSessionClient) getPubAddrsFromRemote(ctx context.Context, remoteAddr string, sessionID []byte) (*PubAddrs, error) {
if pubAddrs := c.getCachedPubAddrs(remoteAddr); pubAddrs != nil {
return pubAddrs, nil
}

buf, err := json.Marshal(&Request{Action: ActionGetPubAddr, SessionID: sessionID})
if err != nil {
return nil, err
Expand Down Expand Up @@ -308,11 +314,21 @@ func (c *TunaSessionClient) getPubAddrsFromRemote(ctx context.Context, remoteAdd
return nil, ErrClosed
}

valid := 0
for _, addr := range pubAddrs.Addrs {
if len(addr.IP) > 0 && addr.Port != 0 {
return pubAddrs, nil
valid++
}
}
if valid == len(pubAddrs.Addrs) {
c.Lock()
c.pubAddrs[remoteAddr] = pubAddrs
c.Unlock()
}
if valid > 0 {
return pubAddrs, nil
}

return nil, ErrNoPubAddr
}

Expand Down Expand Up @@ -697,6 +713,9 @@ func (c *TunaSessionClient) Close() error {

for _, conns := range c.sessionConns {
for _, conn := range conns {
if conn == nil {
continue
}
err := conn.Close()
if err != nil {
log.Println("Conn close error:", err)
Expand Down Expand Up @@ -784,6 +803,7 @@ func (c *TunaSessionClient) handleConn(conn *Conn, sessKey string, i int) {
defer func() {
c.Lock()
c.connCount[sessKey]--
delete(c.sessionConns[sessKey], connID(i))
shouldClose := c.connCount[sessKey] == 0
if shouldClose {
delete(c.sessions, sessKey)
Expand Down Expand Up @@ -1011,3 +1031,42 @@ func (c *TunaSessionClient) IsSessClosed(sessKey string) bool {
func (c *TunaSessionClient) SetTunaNode(node *types.Node) {
c.tunaNode = node
}

func (c *TunaSessionClient) getCachedPubAddrs(remoteAddr string) *PubAddrs {
c.RLock()
cachedAddr, ok := c.pubAddrs[remoteAddr]
c.RUnlock()
if !ok {
return nil
}

for i := 0; i < len(cachedAddr.Addrs); i++ {
if cachedAddr.Addrs[i] == nil {
return nil
}

// Check if any connection is alive on this address
alive := false
c.RLock()
for sessKey, conns := range c.sessionConns {
ra := getRemoteAddrFromSessionKey(sessKey)
if ra == remoteAddr {
conn, ok := conns[connID(i)]
if !ok {
continue
}
if cachedAddr.Addrs[i].String() != conn.RemoteAddr().String() {
alive = true
break
}
}
}
c.RUnlock()

if !alive {
return nil
}
}

return cachedAddr
}
4 changes: 4 additions & 0 deletions message.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ type PubAddr struct {
OutPrice string `json:"outPrice,omitempty"`
}

func (pa *PubAddr) String() string {
return fmt.Sprintf("%v:%v", pa.IP, pa.Port)
}

type PubAddrs struct {
Addrs []*PubAddr `json:"addrs"`
SessionClosed bool `json:"sessionClosed"`
Expand Down
22 changes: 0 additions & 22 deletions tests/tcp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,28 +100,6 @@ func TestCloseAllConnDialer(t *testing.T) {
<-ch
}

// go test -v -run=TestGetPubAddrsFromRemote
// This test case need export tuna session client some private function and member.
// So only test it when developing.
// func TestGetPubAddrsFromRemote(t *testing.T) {
// ch := make(chan string, 1)

// go func() {
// // wait for Listener be ready
// time.Sleep(2 * time.Second)
// tunaSess, _ := StartTunaTcpDialer(bytesToSend, numTcpListener, ch)
// time.Sleep(5 * time.Second)
// pubAddrs1, _ := tunaSess.GetPubAddrsFromRemote(context.Background(), remoteAddr, tunaSess.SessionID)
// log.Printf("pubAddrs1: %+v", pubAddrs1)
// pubAddrs2, _ := tunaSess.GetPubAddrsFromRemote(context.Background(), remoteAddr, tunaSess.SessionID)
// log.Printf("pubAddrs2: %+v", pubAddrs2)
// require.Equal(t, pubAddrs1, pubAddrs2)
// }()

// <-ch
// <-ch
// }

func readTcp(sess net.Conn) error {
timeStart := time.Now()

Expand Down
9 changes: 9 additions & 0 deletions util.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package session
import (
"encoding/hex"
"strconv"
"strings"
"time"
)

Expand All @@ -14,6 +15,14 @@ func sessionKey(remoteAddr string, sessionID []byte) string {
return remoteAddr + ":" + hex.EncodeToString(sessionID)
}

func getRemoteAddrFromSessionKey(sessKey string) string {
remotAddr, _, ok := strings.Cut(sessKey, ":")
if ok {
return remotAddr
}
return ""
}

func connID(i int) string {
return strconv.Itoa(i)
}

0 comments on commit 9700d11

Please sign in to comment.