diff --git a/README.md b/README.md index e1ccdca..79d9e75 100644 --- a/README.md +++ b/README.md @@ -15,4 +15,6 @@ Usage of ./readnetfs: Serve files from the src directory -src string Directory to serve files from + -statsd string + Statsd server address and port in x.x.x.x:port format ``` diff --git a/common/dummycon.go b/common/dummycon.go new file mode 100644 index 0000000..dde1708 --- /dev/null +++ b/common/dummycon.go @@ -0,0 +1,41 @@ +package common + +import ( + "net" + "time" +) + +type DummyConn struct { +} + +func (d DummyConn) Read(b []byte) (n int, err error) { + return 0, nil +} + +func (d DummyConn) Write(b []byte) (n int, err error) { + return 0, nil +} + +func (d DummyConn) Close() error { + return nil +} + +func (d DummyConn) LocalAddr() net.Addr { + return nil +} + +func (d DummyConn) RemoteAddr() net.Addr { + return nil +} + +func (d DummyConn) SetDeadline(t time.Time) error { + return nil +} + +func (d DummyConn) SetReadDeadline(t time.Time) error { + return nil +} + +func (d DummyConn) SetWriteDeadline(t time.Time) error { + return nil +} diff --git a/common/statsd.go b/common/statsd.go new file mode 100644 index 0000000..288016b --- /dev/null +++ b/common/statsd.go @@ -0,0 +1,73 @@ +package common + +import ( + "fmt" + "net" + "strings" + "time" +) + +type StatsdConn struct { + Conn net.Conn + statsdSocket net.Conn +} + +func WrapStatsdConn(conn net.Conn, statsdSocket net.Conn) *StatsdConn { + return &StatsdConn{Conn: conn, statsdSocket: statsdSocket} +} + +func addrToLoggableIp(addr net.Addr) string { + var ip string + switch addr := addr.(type) { + case *net.UDPAddr: + ip = addr.IP.String() + case *net.TCPAddr: + ip = addr.IP.String() + } + + return strings.ReplaceAll(ip, ".", "-") +} + +func (s StatsdConn) Read(b []byte) (n int, err error) { + n, err = s.Conn.Read(b) + if err != nil { + _, _ = fmt.Fprintf(s.statsdSocket, "net.%s.receive_errors:1|c\n", addrToLoggableIp(s.RemoteAddr())) + } else { + _, _ = fmt.Fprintf(s.statsdSocket, "net.%s.received:%d|c\n", addrToLoggableIp(s.RemoteAddr()), n) + } + return n, err +} + +func (s StatsdConn) Write(b []byte) (n int, err error) { + n, err = s.Conn.Write(b) + if err != nil { + _, _ = fmt.Fprintf(s.statsdSocket, "net.%s.transmit_errors:1|c\n", addrToLoggableIp(s.RemoteAddr())) + } else { + _, _ = fmt.Fprintf(s.statsdSocket, "net.%s.transmitted:%d|c\n", addrToLoggableIp(s.RemoteAddr()), n) + } + return n, err +} + +func (s StatsdConn) Close() error { + return s.Conn.Close() +} + +func (s StatsdConn) LocalAddr() net.Addr { + return s.Conn.LocalAddr() +} + +func (s StatsdConn) RemoteAddr() net.Addr { + return s.Conn.RemoteAddr() +} + +func (s StatsdConn) SetDeadline(t time.Time) error { + return s.Conn.SetDeadline(t) +} + +func (s StatsdConn) SetReadDeadline(t time.Time) error { + return s.Conn.SetReadDeadline(t) +} + +func (s StatsdConn) SetWriteDeadline(t time.Time) error { + return s.Conn.SetWriteDeadline(t) +} diff --git a/fileretriever/fileclient.go b/fileretriever/fileclient.go index 7aa244b..48f0359 100644 --- a/fileretriever/fileclient.go +++ b/fileretriever/fileclient.go @@ -3,6 +3,7 @@ package fileretriever import ( "context" "errors" + "fmt" "github.com/hanwen/go-fuse/v2/fuse" "github.com/hashicorp/golang-lru/v2" "github.com/hashicorp/golang-lru/v2/expirable" @@ -13,6 +14,7 @@ import ( "net" "os" "readnetfs/cache" + "readnetfs/common" "strings" "sync" "time" @@ -71,9 +73,10 @@ type FileClient struct { fplock sync.Mutex fInfoCache *expirable.LRU[RemotePath, *FInfo] fDirCache *expirable.LRU[RemotePath, *[]fuse.DirEntry] + statsdSocket net.Conn } -func NewFileClient(srcDir string, peerNodes []string) *FileClient { +func NewFileClient(srcDir string, peerNodes []string, statsdAddrPort string) *FileClient { fcache, _ := lru.New[RemotePath, *cache.CachedFile](cache.MEM_TOTAL_CACHE_B / cache.MEM_PER_FILE_CACHE_B) fPathRemoteCache := expirable.NewLRU[RemotePath, []string](cache.MEM_TOTAL_CACHE_B/cache.MEM_PER_FILE_CACHE_B, func(key RemotePath, value []string) {}, PATH_TTL) @@ -83,8 +86,22 @@ func NewFileClient(srcDir string, peerNodes []string) *FileClient { for _, peer := range peerNodes { pMap[peer] = &PeerInfo{CurrentRequests: semaphore.NewWeighted(int64(MAX_CONCURRENT_REQUESTS))} } + + statsdSocket := func() net.Conn { + if statsdAddrPort != "" { + socket, err := net.Dial("udp", statsdAddrPort) + if err != nil { + log.Warn().Err(err).Msg("Failed to establish statsd connection") + return common.DummyConn{} + } + return socket + } else { + return common.DummyConn{} + } + }() + return &FileClient{srcDir: srcDir, peerNodes: pMap, iMap: make(map[RemotePath]uint64), fcache: fcache, - fPathRemoteCache: fPathRemoteCache, fInfoCache: fInfoCache, fDirCache: fDirCache} + fPathRemoteCache: fPathRemoteCache, fInfoCache: fInfoCache, fDirCache: fDirCache, statsdSocket: statsdSocket} } func (f *FileClient) GetCachedFile(path RemotePath) *cache.CachedFile { @@ -191,6 +208,7 @@ func (f *FileClient) netFileInfoDir(path RemotePath, peer string) (*DirFInfo, er log.Warn().Err(err).Msg("Failed to get peer conn") return nil, err } + conn = common.WrapStatsdConn(conn, f.statsdSocket) err = conn.SetDeadline(time.Now().Add(DEADLINE)) if err != nil { log.Warn().Err(err).Msg("Failed to set deadline") @@ -207,6 +225,7 @@ func (f *FileClient) netFileInfoDir(path RemotePath, peer string) (*DirFInfo, er Length: 0, Path: string(path), } + _, _ = fmt.Fprintf(f.statsdSocket, "requests.outgoing.read_dir_finfo:1|c\n") err = struc.Pack(conn, request) if err != nil { log.Debug().Err(err).Msg("Failed to pack request") @@ -251,6 +270,7 @@ func (f *FileClient) netFileInfo(path RemotePath, peer string) (*FInfo, error) { log.Warn().Err(err).Msg("Failed to get peer conn") return nil, err } + conn = common.WrapStatsdConn(conn, f.statsdSocket) err = conn.SetDeadline(time.Now().Add(DEADLINE)) if err != nil { log.Warn().Err(err).Msg("Failed to set deadline") @@ -267,6 +287,7 @@ func (f *FileClient) netFileInfo(path RemotePath, peer string) (*FInfo, error) { Length: 0, Path: string(path), } + _, _ = fmt.Fprintf(f.statsdSocket, "requests.outgoing.file_info:1|c\n") err = struc.Pack(conn, request) if err != nil { log.Debug().Err(err).Msg("Failed to pack request") @@ -320,6 +341,7 @@ func (f *FileClient) netRead(path RemotePath, offset int64, length int64) ([]byt log.Debug().Err(err).Msg("Failed to get peer conn") return nil, err } + conn = common.WrapStatsdConn(conn, f.statsdSocket) if err != nil { log.Warn().Err(err).Msg("Failed to acquire semaphore") return nil, err @@ -349,6 +371,7 @@ func (f *FileClient) netRead(path RemotePath, offset int64, length int64) ([]byt Length: length, Path: string(path), } + _, _ = fmt.Fprintf(f.statsdSocket, "requests.outgoing.read_content:1|c\n") err = struc.Pack(conn, request) if err != nil { log.Debug().Err(err).Msg("Failed to pack request") @@ -536,6 +559,7 @@ func (f *FileClient) netReadDir(path RemotePath, peer string) ([]fuse.DirEntry, log.Warn().Err(err).Msg("Failed to get peer conn") return nil, err } + conn = common.WrapStatsdConn(conn, f.statsdSocket) defer conn.Close() err = conn.SetDeadline(time.Now().Add(DEADLINE)) if err != nil { @@ -552,6 +576,7 @@ func (f *FileClient) netReadDir(path RemotePath, peer string) ([]fuse.DirEntry, Length: 0, Path: string(path), } + _, _ = fmt.Fprintf(f.statsdSocket, "requests.outgoing.readdir_content:1|c\n") err = struc.Pack(conn, request) if err != nil { log.Warn().Err(err).Msg("Failed to write request") diff --git a/fileretriever/fileserver.go b/fileretriever/fileserver.go index f8ceaac..f57fd0e 100644 --- a/fileretriever/fileserver.go +++ b/fileretriever/fileserver.go @@ -2,6 +2,7 @@ package fileretriever import ( "context" + "fmt" "github.com/lunixbochs/struc" "github.com/rs/zerolog/log" "golang.org/x/time/rate" @@ -10,6 +11,7 @@ import ( "net" "os" "readnetfs/cache" + "readnetfs/common" "strings" "time" ) @@ -56,6 +58,7 @@ func NewFileServer(srcDir string, bind string, fclient *FileClient, rateLimit in } func (f *FileServer) handleDirRequest(conn net.Conn, request *FileRequest) { + _, _ = fmt.Fprintf(f.fclient.statsdSocket, "requests.incoming.readdir_content:1|c\n") path := f.srcDir + "/" + request.Path root := os.DirFS(path) entries, err := fs.ReadDir(root, ".") @@ -83,6 +86,7 @@ func (f *FileServer) handleDirRequest(conn net.Conn, request *FileRequest) { } func (f *FileServer) handleFileRequest(conn net.Conn, request *FileRequest) { + _, _ = fmt.Fprintf(f.fclient.statsdSocket, "requests.incoming.read_content:1|c\n") log.Printf("Trying to read %d bytes at %d from file %s", request.Length, request.Offset, request.Path) start := time.Now() err := f.limiter.Wait(context.Background()) @@ -106,6 +110,7 @@ func (f *FileServer) handleFileRequest(conn net.Conn, request *FileRequest) { } func (f *FileServer) handleGetFileInfo(conn net.Conn, request *FileRequest) { + _, _ = fmt.Fprintf(f.fclient.statsdSocket, "requests.incoming.file_info:1|c\n") fInfo, err := f.fclient.localFileInfo(RemotePath(request.Path)) if err != nil { log.Debug().Err(err).Msgf("Failed to read local file info for %s", request.Path) @@ -119,6 +124,7 @@ func (f *FileServer) handleGetFileInfo(conn net.Conn, request *FileRequest) { } func (f *FileServer) handleDirFInfo(conn net.Conn, request *FileRequest) { + _, _ = fmt.Fprintf(f.fclient.statsdSocket, "requests.incoming.read_dir_finfo:1|c\n") path := f.fclient.Re2Lo(RemotePath(request.Path)) root := os.DirFS(path.String()) entries, err := fs.ReadDir(root, ".") @@ -158,6 +164,7 @@ func (f *FileServer) handleDirFInfo(conn net.Conn, request *FileRequest) { } func (f *FileServer) handleConn(conn net.Conn) { + conn = common.WrapStatsdConn(conn, f.fclient.statsdSocket) defer conn.Close() err := conn.SetDeadline(time.Now().Add(10 * time.Second)) if err != nil { diff --git a/readnetfs.go b/readnetfs.go index 87e1ecb..f1e7553 100644 --- a/readnetfs.go +++ b/readnetfs.go @@ -150,12 +150,13 @@ func main() { send := flag.Bool("send", false, "Serve files from the src directory") receive := flag.Bool("receive", false, "Receive files and mount the net filesystem on the mnt directory") rateLimit := flag.Int("rate", 1000, "rate limit in Mbit/s") + statsdAddrPort := flag.String("statsd", "", "Statsd server address and port in x.x.x.x:port format") flag.Parse() log.Debug().Msg("peers: " + strings.Join(PeerNodes, ", ")) log.Debug().Msg("bind: " + *bindAddrPort) - fclient := fileretriever.NewFileClient(*srcDir, PeerNodes) + fclient := fileretriever.NewFileClient(*srcDir, PeerNodes, *statsdAddrPort) if !*send && !*receive { log.Fatal().Msg("Must specify either send or receive or both")