Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main'
Browse files Browse the repository at this point in the history
  • Loading branch information
likeazir committed Aug 31, 2023
2 parents 9f2c79c + 1127206 commit dc2701e
Show file tree
Hide file tree
Showing 6 changed files with 152 additions and 3 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,6 @@ Usage of ./readnetfs:
Serve files from the src directory
-src string
Directory to serve files from
-statsd string
Statsd server address and port in x.x.x.x:port format
```
41 changes: 41 additions & 0 deletions common/dummycon.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package common

import (
"net"
"time"
)

type DummyConn struct {
}

func (d DummyConn) Read(b []byte) (n int, err error) {
return 0, nil
}

func (d DummyConn) Write(b []byte) (n int, err error) {
return 0, nil
}

func (d DummyConn) Close() error {
return nil
}

func (d DummyConn) LocalAddr() net.Addr {
return nil
}

func (d DummyConn) RemoteAddr() net.Addr {
return nil
}

func (d DummyConn) SetDeadline(t time.Time) error {
return nil
}

func (d DummyConn) SetReadDeadline(t time.Time) error {
return nil
}

func (d DummyConn) SetWriteDeadline(t time.Time) error {
return nil
}
73 changes: 73 additions & 0 deletions common/statsd.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package common

import (
"fmt"
"net"
"strings"
"time"
)

type StatsdConn struct {
Conn net.Conn
statsdSocket net.Conn
}

func WrapStatsdConn(conn net.Conn, statsdSocket net.Conn) *StatsdConn {
return &StatsdConn{Conn: conn, statsdSocket: statsdSocket}
}

func addrToLoggableIp(addr net.Addr) string {
var ip string
switch addr := addr.(type) {
case *net.UDPAddr:
ip = addr.IP.String()
case *net.TCPAddr:
ip = addr.IP.String()
}

return strings.ReplaceAll(ip, ".", "-")
}

func (s StatsdConn) Read(b []byte) (n int, err error) {
n, err = s.Conn.Read(b)
if err != nil {
_, _ = fmt.Fprintf(s.statsdSocket, "net.%s.receive_errors:1|c\n", addrToLoggableIp(s.RemoteAddr()))
} else {
_, _ = fmt.Fprintf(s.statsdSocket, "net.%s.received:%d|c\n", addrToLoggableIp(s.RemoteAddr()), n)
}
return n, err
}

func (s StatsdConn) Write(b []byte) (n int, err error) {
n, err = s.Conn.Write(b)
if err != nil {
_, _ = fmt.Fprintf(s.statsdSocket, "net.%s.transmit_errors:1|c\n", addrToLoggableIp(s.RemoteAddr()))
} else {
_, _ = fmt.Fprintf(s.statsdSocket, "net.%s.transmitted:%d|c\n", addrToLoggableIp(s.RemoteAddr()), n)
}
return n, err
}

func (s StatsdConn) Close() error {
return s.Conn.Close()
}

func (s StatsdConn) LocalAddr() net.Addr {
return s.Conn.LocalAddr()
}

func (s StatsdConn) RemoteAddr() net.Addr {
return s.Conn.RemoteAddr()
}

func (s StatsdConn) SetDeadline(t time.Time) error {
return s.Conn.SetDeadline(t)
}

func (s StatsdConn) SetReadDeadline(t time.Time) error {
return s.Conn.SetReadDeadline(t)
}

func (s StatsdConn) SetWriteDeadline(t time.Time) error {
return s.Conn.SetWriteDeadline(t)
}
29 changes: 27 additions & 2 deletions fileretriever/fileclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package fileretriever
import (
"context"
"errors"
"fmt"
"github.com/hanwen/go-fuse/v2/fuse"
"github.com/hashicorp/golang-lru/v2"
"github.com/hashicorp/golang-lru/v2/expirable"
Expand All @@ -13,6 +14,7 @@ import (
"net"
"os"
"readnetfs/cache"
"readnetfs/common"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -71,9 +73,10 @@ type FileClient struct {
fplock sync.Mutex
fInfoCache *expirable.LRU[RemotePath, *FInfo]
fDirCache *expirable.LRU[RemotePath, *[]fuse.DirEntry]
statsdSocket net.Conn
}

func NewFileClient(srcDir string, peerNodes []string) *FileClient {
func NewFileClient(srcDir string, peerNodes []string, statsdAddrPort string) *FileClient {
fcache, _ := lru.New[RemotePath, *cache.CachedFile](cache.MEM_TOTAL_CACHE_B / cache.MEM_PER_FILE_CACHE_B)
fPathRemoteCache := expirable.NewLRU[RemotePath, []string](cache.MEM_TOTAL_CACHE_B/cache.MEM_PER_FILE_CACHE_B,
func(key RemotePath, value []string) {}, PATH_TTL)
Expand All @@ -83,8 +86,22 @@ func NewFileClient(srcDir string, peerNodes []string) *FileClient {
for _, peer := range peerNodes {
pMap[peer] = &PeerInfo{CurrentRequests: semaphore.NewWeighted(int64(MAX_CONCURRENT_REQUESTS))}
}

statsdSocket := func() net.Conn {
if statsdAddrPort != "" {
socket, err := net.Dial("udp", statsdAddrPort)
if err != nil {
log.Warn().Err(err).Msg("Failed to establish statsd connection")
return common.DummyConn{}
}
return socket
} else {
return common.DummyConn{}
}
}()

return &FileClient{srcDir: srcDir, peerNodes: pMap, iMap: make(map[RemotePath]uint64), fcache: fcache,
fPathRemoteCache: fPathRemoteCache, fInfoCache: fInfoCache, fDirCache: fDirCache}
fPathRemoteCache: fPathRemoteCache, fInfoCache: fInfoCache, fDirCache: fDirCache, statsdSocket: statsdSocket}
}

func (f *FileClient) GetCachedFile(path RemotePath) *cache.CachedFile {
Expand Down Expand Up @@ -191,6 +208,7 @@ func (f *FileClient) netFileInfoDir(path RemotePath, peer string) (*DirFInfo, er
log.Warn().Err(err).Msg("Failed to get peer conn")
return nil, err
}
conn = common.WrapStatsdConn(conn, f.statsdSocket)
err = conn.SetDeadline(time.Now().Add(DEADLINE))
if err != nil {
log.Warn().Err(err).Msg("Failed to set deadline")
Expand All @@ -207,6 +225,7 @@ func (f *FileClient) netFileInfoDir(path RemotePath, peer string) (*DirFInfo, er
Length: 0,
Path: string(path),
}
_, _ = fmt.Fprintf(f.statsdSocket, "requests.outgoing.read_dir_finfo:1|c\n")
err = struc.Pack(conn, request)
if err != nil {
log.Debug().Err(err).Msg("Failed to pack request")
Expand Down Expand Up @@ -251,6 +270,7 @@ func (f *FileClient) netFileInfo(path RemotePath, peer string) (*FInfo, error) {
log.Warn().Err(err).Msg("Failed to get peer conn")
return nil, err
}
conn = common.WrapStatsdConn(conn, f.statsdSocket)
err = conn.SetDeadline(time.Now().Add(DEADLINE))
if err != nil {
log.Warn().Err(err).Msg("Failed to set deadline")
Expand All @@ -267,6 +287,7 @@ func (f *FileClient) netFileInfo(path RemotePath, peer string) (*FInfo, error) {
Length: 0,
Path: string(path),
}
_, _ = fmt.Fprintf(f.statsdSocket, "requests.outgoing.file_info:1|c\n")
err = struc.Pack(conn, request)
if err != nil {
log.Debug().Err(err).Msg("Failed to pack request")
Expand Down Expand Up @@ -320,6 +341,7 @@ func (f *FileClient) netRead(path RemotePath, offset int64, length int64) ([]byt
log.Debug().Err(err).Msg("Failed to get peer conn")
return nil, err
}
conn = common.WrapStatsdConn(conn, f.statsdSocket)
if err != nil {
log.Warn().Err(err).Msg("Failed to acquire semaphore")
return nil, err
Expand Down Expand Up @@ -349,6 +371,7 @@ func (f *FileClient) netRead(path RemotePath, offset int64, length int64) ([]byt
Length: length,
Path: string(path),
}
_, _ = fmt.Fprintf(f.statsdSocket, "requests.outgoing.read_content:1|c\n")
err = struc.Pack(conn, request)
if err != nil {
log.Debug().Err(err).Msg("Failed to pack request")
Expand Down Expand Up @@ -536,6 +559,7 @@ func (f *FileClient) netReadDir(path RemotePath, peer string) ([]fuse.DirEntry,
log.Warn().Err(err).Msg("Failed to get peer conn")
return nil, err
}
conn = common.WrapStatsdConn(conn, f.statsdSocket)
defer conn.Close()
err = conn.SetDeadline(time.Now().Add(DEADLINE))
if err != nil {
Expand All @@ -552,6 +576,7 @@ func (f *FileClient) netReadDir(path RemotePath, peer string) ([]fuse.DirEntry,
Length: 0,
Path: string(path),
}
_, _ = fmt.Fprintf(f.statsdSocket, "requests.outgoing.readdir_content:1|c\n")
err = struc.Pack(conn, request)
if err != nil {
log.Warn().Err(err).Msg("Failed to write request")
Expand Down
7 changes: 7 additions & 0 deletions fileretriever/fileserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package fileretriever

import (
"context"
"fmt"
"github.com/lunixbochs/struc"
"github.com/rs/zerolog/log"
"golang.org/x/time/rate"
Expand All @@ -10,6 +11,7 @@ import (
"net"
"os"
"readnetfs/cache"
"readnetfs/common"
"strings"
"time"
)
Expand Down Expand Up @@ -56,6 +58,7 @@ func NewFileServer(srcDir string, bind string, fclient *FileClient, rateLimit in
}

func (f *FileServer) handleDirRequest(conn net.Conn, request *FileRequest) {
_, _ = fmt.Fprintf(f.fclient.statsdSocket, "requests.incoming.readdir_content:1|c\n")
path := f.srcDir + "/" + request.Path
root := os.DirFS(path)
entries, err := fs.ReadDir(root, ".")
Expand Down Expand Up @@ -83,6 +86,7 @@ func (f *FileServer) handleDirRequest(conn net.Conn, request *FileRequest) {
}

func (f *FileServer) handleFileRequest(conn net.Conn, request *FileRequest) {
_, _ = fmt.Fprintf(f.fclient.statsdSocket, "requests.incoming.read_content:1|c\n")
log.Printf("Trying to read %d bytes at %d from file %s", request.Length, request.Offset, request.Path)
start := time.Now()
err := f.limiter.Wait(context.Background())
Expand All @@ -106,6 +110,7 @@ func (f *FileServer) handleFileRequest(conn net.Conn, request *FileRequest) {
}

func (f *FileServer) handleGetFileInfo(conn net.Conn, request *FileRequest) {
_, _ = fmt.Fprintf(f.fclient.statsdSocket, "requests.incoming.file_info:1|c\n")
fInfo, err := f.fclient.localFileInfo(RemotePath(request.Path))
if err != nil {
log.Debug().Err(err).Msgf("Failed to read local file info for %s", request.Path)
Expand All @@ -119,6 +124,7 @@ func (f *FileServer) handleGetFileInfo(conn net.Conn, request *FileRequest) {
}

func (f *FileServer) handleDirFInfo(conn net.Conn, request *FileRequest) {
_, _ = fmt.Fprintf(f.fclient.statsdSocket, "requests.incoming.read_dir_finfo:1|c\n")
path := f.fclient.Re2Lo(RemotePath(request.Path))
root := os.DirFS(path.String())
entries, err := fs.ReadDir(root, ".")
Expand Down Expand Up @@ -158,6 +164,7 @@ func (f *FileServer) handleDirFInfo(conn net.Conn, request *FileRequest) {
}

func (f *FileServer) handleConn(conn net.Conn) {
conn = common.WrapStatsdConn(conn, f.fclient.statsdSocket)
defer conn.Close()
err := conn.SetDeadline(time.Now().Add(10 * time.Second))
if err != nil {
Expand Down
3 changes: 2 additions & 1 deletion readnetfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,12 +150,13 @@ func main() {
send := flag.Bool("send", false, "Serve files from the src directory")
receive := flag.Bool("receive", false, "Receive files and mount the net filesystem on the mnt directory")
rateLimit := flag.Int("rate", 1000, "rate limit in Mbit/s")
statsdAddrPort := flag.String("statsd", "", "Statsd server address and port in x.x.x.x:port format")

flag.Parse()
log.Debug().Msg("peers: " + strings.Join(PeerNodes, ", "))
log.Debug().Msg("bind: " + *bindAddrPort)

fclient := fileretriever.NewFileClient(*srcDir, PeerNodes)
fclient := fileretriever.NewFileClient(*srcDir, PeerNodes, *statsdAddrPort)

if !*send && !*receive {
log.Fatal().Msg("Must specify either send or receive or both")
Expand Down

0 comments on commit dc2701e

Please sign in to comment.