Skip to content

Commit

Permalink
tunnel: add connection statistics (#299)
Browse files Browse the repository at this point in the history
Add API to get number of the connections.
  • Loading branch information
at-wat authored Jul 8, 2021
1 parent d5da9f3 commit 791fc7a
Show file tree
Hide file tree
Showing 6 changed files with 349 additions and 166 deletions.
39 changes: 36 additions & 3 deletions tunnel/destination.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,31 @@ package tunnel

import (
"io"
"sync"

"google.golang.org/protobuf/proto"

"github.com/seqsense/aws-iot-device-sdk-go/v6/internal/ioterr"
"github.com/seqsense/aws-iot-device-sdk-go/v6/tunnel/msg"
)

func proxyDestination(ws io.ReadWriter, dialer Dialer, eh ErrorHandler) error {
func proxyDestination(ws io.ReadWriter, dialer Dialer, eh ErrorHandler, stat Stat) error {
var muConns sync.Mutex
conns := make(map[int32]io.ReadWriteCloser)
sz := make([]byte, 2)
b := make([]byte, 8192)

updateStat := func() {
if stat != nil {
muConns.Lock()
n := len(conns)
muConns.Unlock()
stat.Update(func(stat *Statistics) {
stat.NumConn = n
})
}
}

for {
if _, err := io.ReadFull(ws, sz); err != nil {
if err == io.EOF {
Expand Down Expand Up @@ -62,28 +76,47 @@ func proxyDestination(ws io.ReadWriter, dialer Dialer, eh ErrorHandler) error {
continue
}

muConns.Lock()
conns[m.StreamId] = conn
go readProxy(ws, conn, m.StreamId, eh)
muConns.Unlock()
go func() {
readProxy(ws, conn, m.StreamId, eh)
muConns.Lock()
if conn, ok := conns[m.StreamId]; ok {
_ = conn.Close()
delete(conns, m.StreamId)
}
muConns.Unlock()
updateStat()
}()

case msg.Message_STREAM_RESET:
muConns.Lock()
if conn, ok := conns[m.StreamId]; ok {
_ = conn.Close()
delete(conns, m.StreamId)
}
muConns.Unlock()

case msg.Message_SESSION_RESET:
muConns.Lock()
for id, c := range conns {
_ = c.Close()
delete(conns, id)
}
muConns.Unlock()
return io.EOF

case msg.Message_DATA:
if conn, ok := conns[m.StreamId]; ok {
muConns.Lock()
conn, ok := conns[m.StreamId]
muConns.Unlock()
if ok {
if _, err := conn.Write(m.Payload); err != nil {
eh.HandleError(ioterr.New(err, "writing message"))
}
}
}
updateStat()
}
}
13 changes: 11 additions & 2 deletions tunnel/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func ProxyDestination(dialer Dialer, endpoint, token string, opts ...ProxyOption
pingCancel := newPinger(ws, opt.PingPeriod)
defer pingCancel()

return proxyDestination(ws, dialer, opt.ErrorHandler)
return proxyDestination(ws, dialer, opt.ErrorHandler, opt.Stat)
}

// ProxySource proxies TCP connection from local socket to
Expand All @@ -72,7 +72,7 @@ func ProxySource(listener net.Listener, endpoint, token string, opts ...ProxyOpt
pingCancel := newPinger(ws, opt.PingPeriod)
defer pingCancel()

return proxySource(ws, listener, opt.ErrorHandler)
return proxySource(ws, listener, opt.ErrorHandler, opt.Stat)
}

func openProxyConn(endpoint, mode, token string, opts ...ProxyOption) (*websocket.Conn, *ProxyOptions, error) {
Expand Down Expand Up @@ -140,6 +140,7 @@ type ProxyOptions struct {
Scheme string
ErrorHandler ErrorHandler
PingPeriod time.Duration
Stat Stat
}

func (o *ProxyOptions) validate() error {
Expand All @@ -166,3 +167,11 @@ func WithPingPeriod(d time.Duration) ProxyOption {
return nil
}
}

// WithStat enables statistics.
func WithStat(stat Stat) ProxyOption {
return func(opt *ProxyOptions) error {
opt.Stat = stat
return nil
}
}
Loading

0 comments on commit 791fc7a

Please sign in to comment.