Skip to content

Commit

Permalink
fuse passthrough: fix cache files closed by lru in passthrough model
Browse files Browse the repository at this point in the history
In passthough model, close will be toke over by go-fuse, so file.Close is unnecessary

Signed-off-by: abushwang <abushwangs@gmail.com>
  • Loading branch information
wswsmao committed Nov 27, 2024
1 parent a0de196 commit 1da63ca
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 4 deletions.
24 changes: 21 additions & 3 deletions cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,8 @@ type Writer interface {
}

type cacheOpt struct {
direct bool
direct bool
passThrough bool
}

type Option func(o *cacheOpt) *cacheOpt
Expand All @@ -113,6 +114,15 @@ func Direct() Option {
}
}

// PassThrough option indicates whether to enable FUSE passthrough mode
// to improve local file read performance.
func PassThrough() Option {
return func(o *cacheOpt) *cacheOpt {
o.passThrough = true
return o
}
}

func NewDirectoryCache(directory string, config DirectoryCacheConfig) (BlobCache, error) {
if !filepath.IsAbs(directory) {
return nil, fmt.Errorf("dir cache path must be an absolute path; got %q", directory)
Expand Down Expand Up @@ -232,8 +242,16 @@ func (dc *directoryCache) Get(key string, opts ...Option) (Reader, error) {
// that won't be accessed immediately.
if dc.direct || opt.direct {
return &reader{
ReaderAt: file,
closeFunc: func() error { return file.Close() },
ReaderAt: file,
closeFunc: func() error {
// In passthough model, close will be toke over by go-fuse
// If "passThrough" option is specified, "direct" option also will
// be specified, so adding this branch here is enough
if opt.passThrough {
return nil
}
return file.Close()
},
}, nil
}

Expand Down
4 changes: 3 additions & 1 deletion fs/reader/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -516,7 +516,8 @@ func (sf *file) GetPassthroughFd() (uintptr, error) {
id := genID(sf.id, firstChunkOffset, totalSize)

for {
r, err := sf.gr.cache.Get(id)
// cache.PassThrough() is necessary to take over files
r, err := sf.gr.cache.Get(id, cache.PassThrough())
if err != nil {
if err := sf.prefetchEntireFile(); err != nil {
return 0, err
Expand Down Expand Up @@ -563,6 +564,7 @@ func (sf *file) prefetchEntireFile() error {
ip := b.Bytes()[:chunkSize]

// Check if the content exists in the cache
// Just read it and merge to a new files, so cache.PassThrough() should not be used here
if r, err := sf.gr.cache.Get(id); err == nil {
n, err := r.ReadAt(ip, 0)
if (err == nil || err == io.EOF) && int64(n) == chunkSize {
Expand Down

0 comments on commit 1da63ca

Please sign in to comment.