Skip to content

Commit

Permalink
Feat/tcp msg (#66)
Browse files Browse the repository at this point in the history
* feat: update tcp

* feat: update tcp

* feat: update log
  • Loading branch information
cntechpower authored Jun 1, 2024
1 parent 9034314 commit 446f38b
Show file tree
Hide file tree
Showing 26 changed files with 232 additions and 177 deletions.
24 changes: 12 additions & 12 deletions agent/agent/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func (a *Agent) mustGetTlsConnToServer() *_tls.Conn {
var err error
c, err = tls.DialTlsServer(a.addr.IP.String(), a.addr.Port, a.credential)
if err != nil {
h.Errorf("can not connect to server %v, error: %v", a.addr, err)
h.Errorf("can not connect to server %+v, error: %+v", a.addr, err)
// sleep 5 second and retry
time.Sleep(5 * time.Second)
continue
Expand All @@ -35,17 +35,17 @@ func (a *Agent) newProxyConn(localAddr string) {
h := log.NewHeader("newProxyConn")
dst, err := net.Dial("tcp", localAddr)
if err != nil {
h.Errorf("error while dial to localAddr %v", err)
h.Errorf("error while dial to localAddr %+v", err)
return
}
// let server use this local connection
c := conn.NewWrappedConn("server", a.mustGetTlsConnToServer())
if err := c.Send(model.NewTunnelBeginMsg(a.user, a.zone, a.id, localAddr)); err != nil {
h.Errorf("error while send tunnel pkg : %v", err)
h.Errorf("error while send tunnel pkg : %+v", err)
_ = c.Close()
_ = dst.Close()
}
h.Infof("called newProxyConn for %v", localAddr)
h.Infof("called newProxyConn for %+v", localAddr)
idx := a.joinedConn.Add(nil, c, conn.NewWrappedConn("server", dst))
conn.JoinConn(c.GetConn(), dst)
_ = a.joinedConn.Remove(idx)
Expand All @@ -61,12 +61,12 @@ CONNECT:
a.status = "INIT"
a.adminConn.ResetConn(c)
if err := a.SendControlConnRegisterPkg(); err != nil {
h.Errorf("can not send register pkg to server %v, error: %v", a.addr, err)
h.Errorf("can not send register pkg to server %+v, error: %+v", a.addr, err)
_ = c.Close()
time.Sleep(time.Duration(dur) * time.Second)
goto CONNECT
}
h.Infof("init control connection to server %v success", a.addr)
h.Infof("init control connection to server %+v success", a.addr)
a.status = "RUNNING"

}
Expand All @@ -82,7 +82,7 @@ func (a *Agent) ControlConnHeartBeatSendLoop(dur int, ctx context.Context) {
}
if err := a.sendHeartBeatPkg(); err != nil {
_ = a.adminConn.Close()
h.Errorf("send heartbeat error: %v, sleep %v s and try again", err, dur)
h.Errorf("send heartbeat error: %+v, sleep %+v s and try again", err, dur)

} else {
a.lastAckSendTime = time.Now()
Expand All @@ -108,7 +108,7 @@ func (a *Agent) handleAdminConnection(ctx context.Context) {
default:
}
if err := a.adminConn.Receive(&msg); err != nil {
h.Errorf("receive from admin conn error: %v, call reconnecting", err)
h.Errorf("receive from admin conn error: %+v, call reconnecting", err)
_ = a.adminConn.Close()
time.Sleep(time.Second)
a.initControlConn(1)
Expand All @@ -117,22 +117,22 @@ func (a *Agent) handleAdminConnection(ctx context.Context) {
switch msg.ReqType {
case model.PkgTunnelBegin:
m, _ := model.ParseTunnelBeginPkg(msg.Message)
h.Infof("got PkgDataConnTunnel for : %v", m.LocalAddr)
h.Infof("got PkgDataConnTunnel for : %+v", m.LocalAddr)
go a.newProxyConn(m.LocalAddr)
case model.PkgReqHeartBeatPong:
a.lastAckRcvTime = time.Now()

case model.PkgAuthenticationFail:
m, _ := model.ParseAuthenticationFailMsg(msg.Message)
h.Fatalf("authentication fail: %v", m)
h.Fatalf("authentication fail: %+v", m)

case model.PkgUDPData:
err := util.SendUDP(msg.To, msg.Message)
if err != nil {
h.Errorf("send udp data to %v error: %v", msg.To, err)
h.Errorf("send udp data to %+v error: %+v", msg.To, err)
}
default:
h.Errorf("got unknown ReqType: %v, message is: %v", msg.ReqType, string(msg.Message))
h.Errorf("got unknown ReqType: %+v, message is: %+v", msg.ReqType, string(msg.Message))
_ = a.adminConn.Close()
}
}
Expand Down
10 changes: 5 additions & 5 deletions agent/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func main() {
Long: `list anywhere conns.`,
Run: func(cmd *cobra.Command, args []string) {
if err := handler.ListConns(grpcAddress); err != nil {
fmt.Printf("error query conn list: %v\n", err)
fmt.Printf("error query conn list: %+v\n", err)
}
},
}
Expand All @@ -73,7 +73,7 @@ func main() {
Long: `kill anywhere conn.`,
Run: func(cmd *cobra.Command, args []string) {
if err := handler.KillConn(grpcAddress, connIdToKill); err != nil {
fmt.Printf("error query agent list: %v\n", err)
fmt.Printf("error query agent list: %+v\n", err)
}
},
}
Expand All @@ -83,7 +83,7 @@ func main() {
Long: `flush anywhere conn.`,
Run: func(cmd *cobra.Command, args []string) {
if err := handler.FlushConns(grpcAddress); err != nil {
fmt.Printf("error query agent list: %v\n", err)
fmt.Printf("error query agent list: %+v\n", err)
}
},
}
Expand All @@ -93,7 +93,7 @@ func main() {
Long: `show agent status`,
Run: func(cmd *cobra.Command, args []string) {
if err := handler.ShowStatus(grpcAddress); err != nil {
fmt.Printf("error query agent status: %v\n", err)
fmt.Printf("error query agent status: %+v\n", err)
}
},
}
Expand Down Expand Up @@ -136,7 +136,7 @@ func run(_ *cobra.Command, _ []string) error {

select {
case err := <-rpcExitChan:
h.Fatalf("Grpc existing unexpected: %v", err)
h.Fatalf("Grpc existing unexpected: %+v", err)
case <-serverExitChan:
h.Infof("Agent Existing")
}
Expand Down
59 changes: 56 additions & 3 deletions conn/conn.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package conn

import (
"bufio"
"bytes"
"encoding/binary"
"encoding/json"
"fmt"
"net"
Expand All @@ -10,6 +13,48 @@ import (

var ErrNilConn = fmt.Errorf("empty net.Conn")

// encode 将消息编码
func encode(message []byte) ([]byte, error) {
// 读取消息的长度,转换成int32类型(占4个字节)
var length = int32(len(message))
var pkg = new(bytes.Buffer)
// 写入消息头
err := binary.Write(pkg, binary.LittleEndian, length)
if err != nil {
return nil, err
}
// 写入消息实体
err = binary.Write(pkg, binary.LittleEndian, message)
if err != nil {
return nil, err
}
return pkg.Bytes(), nil
}

// decode 解码消息
func decode(reader *bufio.Reader) ([]byte, error) {
// 读取消息的长度
lengthByte, _ := reader.Peek(4) // 读取前4个字节的数据
lengthBuff := bytes.NewBuffer(lengthByte)
var length int32
err := binary.Read(lengthBuff, binary.LittleEndian, &length)
if err != nil {
return nil, err
}
// Buffered返回缓冲中现有的可读取的字节数。
if int32(reader.Buffered()) < length+4 {
return nil, fmt.Errorf("size not enough")
}

// 读取真正的消息数据
pack := make([]byte, int(4+length))
_, err = reader.Read(pack)
if err != nil {
return nil, err
}
return pack[4:], nil
}

type WrappedConn struct {
RemoteName string
connRwMu sync.RWMutex
Expand Down Expand Up @@ -37,7 +82,11 @@ func (c *WrappedConn) Send(m interface{}) error {
if err != nil {
return err
}
if _, err := c.Conn.Write(p); err != nil {
msg, err := encode(p)
if err != nil {
return err
}
if _, err := c.Conn.Write(msg); err != nil {
return err
}
return nil
Expand All @@ -49,8 +98,12 @@ func (c *WrappedConn) Receive(rsp interface{}) error {
if c.Conn == nil {
return ErrNilConn
}
d := json.NewDecoder(c.Conn)
if err := d.Decode(&rsp); err != nil {
reader := bufio.NewReader(c.Conn)
msg, err := decode(reader)
if err != nil {
return err
}
if err := json.Unmarshal(msg, &rsp); err != nil {
return err
}
return nil
Expand Down
2 changes: 1 addition & 1 deletion conn/join.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func JoinConn(remote, local net.Conn) (uint64, uint64) {
var localToRemoteBytes, remoteToLocalBytes int64
go joinWithClose(remote, local, &localToRemoteBytes)
go joinWithClose(local, remote, &remoteToLocalBytes)
log.Infof(h, "joined conn %v and %v", remote.LocalAddr(), local.RemoteAddr())
log.Infof(h, "joined conn %+v and %+v", remote.LocalAddr(), local.RemoteAddr())
wg.Wait()
return uint64(localToRemoteBytes), uint64(remoteToLocalBytes)
}
17 changes: 9 additions & 8 deletions conn/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@ import (

"github.com/cntechpower/utils/tracing"

"github.com/cntechpower/anywhere/constants"
"github.com/cntechpower/utils/log"

"github.com/cntechpower/anywhere/constants"
)

var ErrConnectionPoolFull = fmt.Errorf("connection pool is full")
Expand Down Expand Up @@ -53,8 +54,8 @@ func (p *connectionPool) Get(ctx context.Context, proxyAddr string) (c *WrappedC
}
p.mu.Unlock()
for i := 0; i < constants.ProxyConnGetMaxRetryCount; i++ {
_ = tracing.Do(ctxNew, fmt.Sprintf("connectionPool.Get.Wait-%v", i), func() error {
//get connection first
_ = tracing.Do(ctxNew, fmt.Sprintf("connectionPool.Get.Wait-%+v", i), func() error {
// get connection first
p.newConnectionFn(proxyAddr)
select {
case c = <-p.pool[proxyAddr]:
Expand All @@ -69,7 +70,7 @@ func (p *connectionPool) Get(ctx context.Context, proxyAddr string) (c *WrappedC
}
ext.HTTPStatusCode.Set(span, http.StatusRequestTimeout)
ext.Error.Set(span, true)
return nil, fmt.Errorf("timeout while waiting for proxy conn for %v", proxyAddr)
return nil, fmt.Errorf("timeout while waiting for proxy conn for %+v", proxyAddr)
}

func (p *connectionPool) Put(ctx context.Context, proxyAddr string, connection *WrappedConn) error {
Expand Down Expand Up @@ -97,14 +98,14 @@ func (p *connectionPool) houseKeeper() {
checkedMap := make(map[*WrappedConn]struct{}, len(pool))
select {
case c := <-p.pool[proxyAddr]:
//this connection is already checked
//because channel is FIFO, so that means all connection in channel has been checked.
// this connection is already checked
// because channel is FIFO, so that means all connection in channel has been checked.
if _, ok := checkedMap[c]; ok {
break
}
checkedMap[c] = struct{}{}
if c.CreateTime.Add(p.idleTimeout).Before(time.Now()) { //connection is exceeded idle timeout, closing it.
log.Infof(h, "connection for %v is exceed idle timeout, will close it.", proxyAddr)
if c.CreateTime.Add(p.idleTimeout).Before(time.Now()) { // connection is exceeded idle timeout, closing it.
log.Infof(h, "connection for %+v is exceed idle timeout, will close it.", proxyAddr)
_ = c.Close()
} else {
p.pool[proxyAddr] <- c
Expand Down
5 changes: 3 additions & 2 deletions dao/config/dao.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
package config

import (
"github.com/cntechpower/utils/log"

"github.com/cntechpower/anywhere/dao"
"github.com/cntechpower/anywhere/model"
"github.com/cntechpower/utils/log"
)

func Save(config *model.ProxyConfig) (err error) {
Expand Down Expand Up @@ -44,7 +45,7 @@ func Migrate() (err error) {
for _, u := range cs.ProxyConfigs {
for _, c := range u {
if err = Save(c); err != nil {
h.Errorf("save %+v to db error: %v", c, err)
h.Errorf("save %+v to db error: %+v", c, err)
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions dao/connlist/joinedConn.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func (l *JoinedConnList) KillById(id uint) (err error) {
l.listMu.Lock()
defer l.listMu.Unlock()
if c, exist := l.list[id]; !exist {
return fmt.Errorf("no such id %v", id)
return fmt.Errorf("no such id %+v", id)
} else {
_ = c.src.Close()
_ = c.dst.Close()
Expand All @@ -87,7 +87,7 @@ func (l *JoinedConnList) Remove(id uint) (err error) {
l.listMu.Lock()
defer l.listMu.Unlock()
if _, exist := l.list[id]; !exist {
return fmt.Errorf("no such id %v", id)
return fmt.Errorf("no such id %+v", id)
} else {
delete(l.list, id)
}
Expand Down
4 changes: 2 additions & 2 deletions model/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,10 +99,10 @@ type GroupConnList struct {

func NewProxyConfig(userName, zoneName string, remotePort int, localAddr string, isWhiteListOn bool, whiteListIps, listenType string) (*ProxyConfig, error) {
if err := util.CheckPortValid(remotePort); err != nil {
return nil, fmt.Errorf("invalid remoteAddr %v in config, error: %v", localAddr, err)
return nil, fmt.Errorf("invalid remoteAddr %+v in config, error: %+v", localAddr, err)
}
if err := util.CheckAddrValid(localAddr); err != nil {
return nil, fmt.Errorf("invalid localAddr %v in config, error: %v", localAddr, err)
return nil, fmt.Errorf("invalid localAddr %+v in config, error: %+v", localAddr, err)
}
return &ProxyConfig{
UserName: userName,
Expand Down
8 changes: 4 additions & 4 deletions model/proxyConfig_heap.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func (ph *ProxyConfigHeap) IsValid() bool {
}
for i := n/2 - 1; i >= 0; i-- {
if ph.less(ph.list[2*i+1], ph.list[i]) || (2*i+2 < n && ph.less(ph.list[2*i+2], ph.list[i])) {
fmt.Printf("invalid list[%v]: %v,list[%v]: %v,list[%v]: %v,\n", i, ph.list[i], 2*i+1, ph.list[2*i+1], 2*i+2, ph.list[2*i+2])
fmt.Printf("invalid list[%+v]: %+v,list[%+v]: %+v,list[%+v]: %+v,\n", i, ph.list[i], 2*i+1, ph.list[2*i+1], 2*i+2, ph.list[2*i+2])
return false
}
}
Expand All @@ -51,7 +51,7 @@ func (ph *ProxyConfigHeap) Pop() *ProxyConfig {
ph.down(0, len(ph.list)-1)
c := ph.list[n]
ph.list = ph.list[0:n]
// fmt.Printf("checking valid of ph: %v\n", ph.IsValid())
// fmt.Printf("checking valid of ph: %+v\n", ph.IsValid())
return c

}
Expand Down Expand Up @@ -101,8 +101,8 @@ func InitProxyConfigHeap(cs []*ProxyConfig, less func(i, j *ProxyConfig) bool, l
}
for _, c := range cs {
ph.Push(c)
// fmt.Printf("after push %v to heap result %v, heap is %v\n", c.NetworkFlowRemoteToLocalInBytes, ok, ph.debugPrint())
// fmt.Printf("after push %+v to heap result %+v, heap is %+v\n", c.NetworkFlowRemoteToLocalInBytes, ok, ph.debugPrint())
}
// fmt.Printf("checking valid of ph: %v\n", ph.IsValid())
// fmt.Printf("checking valid of ph: %+v\n", ph.IsValid())
return ph
}
Loading

0 comments on commit 446f38b

Please sign in to comment.