Skip to content

Commit

Permalink
allow concurrent access, check fInfo for nil
Browse files Browse the repository at this point in the history
  • Loading branch information
likeazir committed Aug 31, 2023
1 parent c376989 commit 9f2c79c
Showing 1 changed file with 20 additions and 15 deletions.
35 changes: 20 additions & 15 deletions readnetfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,30 +7,32 @@ import (
"github.com/hanwen/go-fuse/v2/fuse"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"golang.org/x/sync/semaphore"
"os"
"readnetfs/cache"
"readnetfs/fileretriever"
"strings"
"sync"
"syscall"
)

var MAX_CONCURRENCY int64 = 10

type VirtNode struct {
fusefs.Inode
path fileretriever.RemotePath
mu sync.Mutex
sem *semaphore.Weighted
fc *fileretriever.FileClient
}

func (n *VirtNode) Open(ctx context.Context, openFlags uint32) (fh fusefs.FileHandle, fuseFlags uint32, errno syscall.Errno) {
n.mu.Lock()
defer n.mu.Unlock()
n.sem.Acquire(ctx, 1)
defer n.sem.Release(1)
return nil, 0, 0
}

func (n *VirtNode) Read(ctx context.Context, fh fusefs.FileHandle, dest []byte, off int64) (fuse.ReadResult, syscall.Errno) {
n.mu.Lock()
defer n.mu.Unlock()
n.sem.Acquire(ctx, 1)
defer n.sem.Release(1)
log.Trace().Msgf("Reading at %d from %s", off, n.path)
cacheEntry := n.fc.GetCachedFile(n.path)
if cacheEntry != nil {
Expand Down Expand Up @@ -68,14 +70,14 @@ func (n *VirtNode) Read(ctx context.Context, fh fusefs.FileHandle, dest []byte,
}

func (n *VirtNode) Write(ctx context.Context, fh fusefs.FileHandle, buf []byte, off int64) (uint32, syscall.Errno) {
n.mu.Lock()
defer n.mu.Unlock()
n.sem.Acquire(ctx, 1)
defer n.sem.Release(1)
return 0, 0
}

func (n *VirtNode) Getattr(ctx context.Context, fh fusefs.FileHandle, out *fuse.AttrOut) syscall.Errno {
n.mu.Lock()
defer n.mu.Unlock()
n.sem.Acquire(ctx, 1)
defer n.sem.Release(1)
fInfo, err := n.fc.FileInfo(n.path)
if err != nil {
return syscall.EIO
Expand All @@ -86,12 +88,13 @@ func (n *VirtNode) Getattr(ctx context.Context, fh fusefs.FileHandle, out *fuse.
}

func (n *VirtNode) Lookup(ctx context.Context, name string, out *fuse.EntryOut) (*fusefs.Inode, syscall.Errno) {
n.mu.Lock()
defer n.mu.Unlock()
n.sem.Acquire(ctx, 1)
defer n.sem.Release(1)
log.Debug().Msgf("Looking up %s in %s", name, n.path)
childpath := n.path.Append(name)
fInfo, err := n.fc.FileInfo(childpath)
if err != nil {
//TODO proper fix of nil finfo
if err != nil || fInfo == nil {
log.Debug().Err(err).Msgf("Failed to read file info for %s", childpath)
return nil, syscall.EIO
}
Expand All @@ -104,6 +107,7 @@ func (n *VirtNode) Lookup(ctx context.Context, name string, out *fuse.EntryOut)
stable.Mode = uint32(fuse.S_IFREG)
}
cNode := &VirtNode{
sem: semaphore.NewWeighted(MAX_CONCURRENCY),
path: childpath,
fc: n.fc,
}
Expand All @@ -112,8 +116,8 @@ func (n *VirtNode) Lookup(ctx context.Context, name string, out *fuse.EntryOut)
}

func (n *VirtNode) Readdir(ctx context.Context) (fusefs.DirStream, syscall.Errno) {
n.mu.Lock()
defer n.mu.Unlock()
n.sem.Acquire(ctx, 1)
defer n.sem.Release(1)
log.Trace().Msgf("Reading dir %s", n.path)
entries, err := n.fc.ReadDir(n.path)
if err != nil {
Expand Down Expand Up @@ -166,6 +170,7 @@ func main() {
os.Mkdir(*mntDir, 0755)
root := &VirtNode{
Inode: fusefs.Inode{},
sem: semaphore.NewWeighted(MAX_CONCURRENCY),
path: "",
fc: fclient,
}
Expand Down

0 comments on commit 9f2c79c

Please sign in to comment.