Skip to content

Commit

Permalink
Fix: Pyramid delete before open (#4062)
Browse files Browse the repository at this point in the history
* Fix: Pyramid delete before open

* Second attempt

* Fixes 1

* Fixes 2

* Fixes 3

* Fixes 4

* Update pkg/pyramid/file_tracker_test.go

Co-authored-by: itai-david <90712874+itai-david@users.noreply.github.com>

Co-authored-by: itai-david <90712874+itai-david@users.noreply.github.com>
  • Loading branch information
N-o-Z and itaidavid authored Sep 6, 2022
1 parent 32f9f4e commit d1873a0
Show file tree
Hide file tree
Showing 3 changed files with 147 additions and 25 deletions.
67 changes: 67 additions & 0 deletions pkg/pyramid/file_tracker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package pyramid

import (
"sync"

"github.com/treeverse/lakefs/pkg/pyramid/params"
)

// fileTracker tracks file open requests in TierFS to avoid race conditions with cache rejection/eviction
type fileTracker struct {
refMap map[string]*tracked
mu sync.Mutex
delete deleteCallback
}

type deleteCallback func(path params.RelativePath)

type tracked struct {
ref int
deleted bool
}

func NewFileTracker(delete deleteCallback) *fileTracker {
return &fileTracker{
refMap: map[string]*tracked{},
delete: delete,
}
}

func (t *fileTracker) Open(path params.RelativePath) func() {
t.mu.Lock()
defer t.mu.Unlock()
if val, ok := t.refMap[string(path)]; ok {
val.ref++
} else {
t.refMap[string(path)] = &tracked{
ref: 1,
}
}
return func() {
t.close(path)
}
}

func (t *fileTracker) close(path params.RelativePath) {
t.mu.Lock()
defer t.mu.Unlock()
if val, ok := t.refMap[string(path)]; ok {
val.ref--
if val.ref == 0 {
delete(t.refMap, string(path))
if val.deleted {
t.delete(path)
}
}
}
}

func (t *fileTracker) Delete(path params.RelativePath) {
t.mu.Lock()
defer t.mu.Unlock()
if val, ok := t.refMap[string(path)]; ok {
val.deleted = true
} else {
t.delete(path)
}
}
55 changes: 55 additions & 0 deletions pkg/pyramid/file_tracker_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package pyramid_test

import (
"testing"

"github.com/stretchr/testify/require"
"github.com/treeverse/lakefs/pkg/pyramid"
"github.com/treeverse/lakefs/pkg/pyramid/params"
)

var testFileMap map[string]bool

func TestTracker(t *testing.T) {
const filename = "test_filename"
file1 := params.RelativePath(filename)
file2 := params.RelativePath("dummy_file")
testFileMap = map[string]bool{}
tracker := pyramid.NewFileTracker(testTrackerDeleteCB)

// Delete non existent
tracker.Delete(file2)

// Delete file before close
testFileMap[string(file1)] = true
testFileMap[string(file2)] = true
closer := tracker.Open(file1)
_ = tracker.Open(file1)
_ = tracker.Open(file2)
tracker.Delete(file1)
require.True(t, testFileMap[string(file1)])

// Close one reference and ensure file still not deleted
closer()
require.True(t, testFileMap[string(file1)])

// Close last reference and ensure file was deleted
closer()
require.False(t, testFileMap[string(file1)])
require.True(t, testFileMap[string(file2)])

// Close deleted file - sanity
closer()

// Delete after close
testFileMap[string(file1)] = true
_ = tracker.Open(file1)
closer()
tracker.Delete(file1)
require.False(t, testFileMap[string(file1)])
require.True(t, testFileMap[string(file2)])
}

func testTrackerDeleteCB(p params.RelativePath) {
testFileMap[string(p)] = false
}
50 changes: 25 additions & 25 deletions pkg/pyramid/tier_fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"strings"

"github.com/google/uuid"

"github.com/treeverse/lakefs/pkg/block"
"github.com/treeverse/lakefs/pkg/cache"
"github.com/treeverse/lakefs/pkg/logging"
Expand All @@ -26,9 +25,10 @@ type TierFS struct {
logger logging.Logger
adapter block.Adapter

eviction params.Eviction
keyLock cache.OnlyOne
syncDir *directory
eviction params.Eviction
keyLock cache.OnlyOne
syncDir *directory
fileTracker *fileTracker

fsName string

Expand Down Expand Up @@ -57,6 +57,7 @@ func NewFS(c *params.InstanceParams) (FS, error) {
keyLock: cache.NewChanOnlyOne(),
remotePrefix: c.BlockStoragePrefix,
}
tfs.fileTracker = NewFileTracker(tfs.removeFromLocalInternal)
if c.Eviction == nil {
var err error
c.Eviction, err = newRistrettoEviction(c.AllocatedBytes(), tfs.removeFromLocal)
Expand All @@ -83,7 +84,7 @@ func (tfs *TierFS) log(ctx context.Context) logging.Logger {
// It does 2 things:
// 1. Adds stored files to the eviction control
// 2. Remove workspace directories and all its content if it
// exist under the namespace dir.
// exists under the namespace dir.
func (tfs *TierFS) handleExistingFiles() error {
if err := filepath.Walk(tfs.fsLocalBaseDir, func(p string, info os.FileInfo, err error) error {
if err != nil {
Expand Down Expand Up @@ -114,7 +115,8 @@ func (tfs *TierFS) removeFromLocal(rPath params.RelativePath, filesize int64) {
// This will be called by the cache eviction mechanism during entry insert.
// We don't want to wait while the file is being removed from the local disk.
evictionHistograms.WithLabelValues(tfs.fsName).Observe(float64(filesize))
go tfs.removeFromLocalInternal(rPath)
// Notify tracker on delete
tfs.fileTracker.Delete(rPath)
}

func (tfs *TierFS) removeFromLocalInternal(rPath params.RelativePath) {
Expand All @@ -128,10 +130,13 @@ func (tfs *TierFS) removeFromLocalInternal(rPath params.RelativePath) {
return
}

if err := tfs.syncDir.deleteDirRecIfEmpty(path.Dir(string(rPath))); err != nil {
tfs.logger.WithError(err).Error("Failed deleting empty dir")
errorsTotal.WithLabelValues(tfs.fsName, "DirRemoval")
}
// Delete Dir async
go func() {
if err := tfs.syncDir.deleteDirRecIfEmpty(path.Dir(string(rPath))); err != nil {
tfs.logger.WithError(err).Error("Failed deleting empty dir")
errorsTotal.WithLabelValues(tfs.fsName, "DirRemoval")
}
}()
}

func (tfs *TierFS) store(ctx context.Context, namespace, originalPath, nsPath, filename string) error {
Expand Down Expand Up @@ -205,7 +210,7 @@ func (tfs *TierFS) Create(_ context.Context, namespace string) (StoredFile, erro
}, nil
}

// Open returns the a file descriptor to the local file.
// Open returns a file descriptor to the local file.
// If the file is missing from the local disk, it will try to fetch it from the block storage.
func (tfs *TierFS) Open(ctx context.Context, namespace, filename string) (File, error) {
nsPath, err := parseNamespacePath(namespace)
Expand Down Expand Up @@ -256,19 +261,12 @@ func (tfs *TierFS) openFile(ctx context.Context, fileRef localFileRef, fh *os.Fi
}

if !tfs.eviction.Store(fileRef.fsRelativePath, stat.Size()) {
tfs.fileTracker.Delete(fileRef.fsRelativePath)
tfs.log(ctx).WithFields(logging.Fields{
"namespace": fileRef.namespace,
"file": fileRef.filename,
"full_path": fileRef.fullPath,
}).Info("stored file immediately rejected from cache (delete but continue)")

// A rare occurrence, (currently) happens when Ristretto cache is not set up
// to perform any caching. So be less strict: prefer to serve the file and
// delete it from the cache. It will be removed from disk when the last
// surviving file descriptor -- returned from this function -- is closed.
if err := os.Remove(fileRef.fullPath); err != nil {
return nil, err
}
}
return &ROFile{
File: fh,
Expand All @@ -289,7 +287,10 @@ func (tfs *TierFS) openWithLock(ctx context.Context, fileRef localFileRef) (*os.
"fullpath": fileRef.fullPath,
}).Trace("try to lock for open")
}
_, err := tfs.keyLock.Compute(fileRef.filename, func() (interface{}, error) {

closer := tfs.fileTracker.Open(fileRef.fsRelativePath)
defer closer()
fileFullPath, err := tfs.keyLock.Compute(fileRef.filename, func() (interface{}, error) {
// check again file existence, now that we have the lock
_, err := os.Stat(fileRef.fullPath)
if err == nil {
Expand All @@ -301,7 +302,8 @@ func (tfs *TierFS) openWithLock(ctx context.Context, fileRef localFileRef) (*os.
}).Trace("got lock; file exists after all")
}
cacheAccess.WithLabelValues(tfs.fsName, "Hit").Inc()
return nil, nil

return fileRef.fullPath, nil
}
if !os.IsNotExist(err) {
return nil, fmt.Errorf("stat file: %w", err)
Expand Down Expand Up @@ -350,18 +352,16 @@ func (tfs *TierFS) openWithLock(ctx context.Context, fileRef localFileRef) (*os.
return nil, fmt.Errorf("rename temp file: %w", err)
}

return nil, nil
return fileRef.fullPath, nil
})

if err != nil {
return nil, err
}

fh, err := os.Open(fileRef.fullPath)
fh, err := os.Open(fileFullPath.(string))
if err != nil {
return nil, fmt.Errorf("open file: %w", err)
}

return fh, nil
}

Expand Down

0 comments on commit d1873a0

Please sign in to comment.