Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix: Pyramid delete before open #4062

Merged
merged 7 commits into from
Sep 6, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 67 additions & 0 deletions pkg/pyramid/file_tracker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package pyramid

import (
"sync"

"github.com/treeverse/lakefs/pkg/pyramid/params"
)

// fileTracker tracks file open requests in TierFS to avoid race conditions with cache rejection/eviction
type fileTracker struct {
refMap map[string]*tracked
mu sync.Mutex
delete deleteCallback
}

type deleteCallback func(path params.RelativePath)

type tracked struct {
ref int
deleted bool
}

func NewFileTracker(delete deleteCallback) *fileTracker {
return &fileTracker{
refMap: map[string]*tracked{},
delete: delete,
}
}

func (t *fileTracker) Open(path params.RelativePath) func() {
t.mu.Lock()
defer t.mu.Unlock()
if val, ok := t.refMap[string(path)]; ok {
val.ref++
} else {
t.refMap[string(path)] = &tracked{
ref: 1,
}
}
return func() {
t.close(path)
}
}

func (t *fileTracker) close(path params.RelativePath) {
t.mu.Lock()
defer t.mu.Unlock()
if val, ok := t.refMap[string(path)]; ok {
val.ref--
if val.ref == 0 {
delete(t.refMap, string(path))
if val.deleted {
t.delete(path)
}
}
}
}

func (t *fileTracker) Delete(path params.RelativePath) {
t.mu.Lock()
defer t.mu.Unlock()
if val, ok := t.refMap[string(path)]; ok {
val.deleted = true
} else {
t.delete(path)
}
}
55 changes: 55 additions & 0 deletions pkg/pyramid/file_tracker_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package pyramid_test

import (
"testing"

"github.com/stretchr/testify/require"
"github.com/treeverse/lakefs/pkg/pyramid"
"github.com/treeverse/lakefs/pkg/pyramid/params"
)

var testFileMap map[string]bool

func TestTracker(t *testing.T) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Awesome 🤩

const filename = "test_filename"
file1 := params.RelativePath(filename)
file2 := params.RelativePath("dummy_file")
testFileMap = map[string]bool{}
tracker := pyramid.NewFileTracker(testTrackerDeleteCB)

// Delete non existent
tracker.Delete(file2)

// Delete file before close
testFileMap[string(file1)] = true
testFileMap[string(file2)] = true
closer := tracker.Open(file1)
_ = tracker.Open(file1)
_ = tracker.Open(file2)
tracker.Delete(file1)
require.True(t, testFileMap[string(file1)])

// Close one reference and ensure file still not deleted
closer()
require.True(t, testFileMap[string(file1)])

// Close last reference and ensure file was deleted
closer()
require.False(t, testFileMap[string(file1)])
require.True(t, testFileMap[string(file2)])

// Close deleted file - sanity
closer()

// Delete after close
testFileMap[string(file1)] = true
_ = tracker.Open(file1)
closer()
tracker.Delete(file1)
require.False(t, testFileMap[string(file1)])
require.True(t, testFileMap[string(file2)])
}

func testTrackerDeleteCB(p params.RelativePath) {
testFileMap[string(p)] = false
}
50 changes: 25 additions & 25 deletions pkg/pyramid/tier_fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"strings"

"github.com/google/uuid"

"github.com/treeverse/lakefs/pkg/block"
"github.com/treeverse/lakefs/pkg/cache"
"github.com/treeverse/lakefs/pkg/logging"
Expand All @@ -26,9 +25,10 @@ type TierFS struct {
logger logging.Logger
adapter block.Adapter

eviction params.Eviction
keyLock cache.OnlyOne
syncDir *directory
eviction params.Eviction
keyLock cache.OnlyOne
syncDir *directory
fileTracker *fileTracker

fsName string

Expand Down Expand Up @@ -57,6 +57,7 @@ func NewFS(c *params.InstanceParams) (FS, error) {
keyLock: cache.NewChanOnlyOne(),
remotePrefix: c.BlockStoragePrefix,
}
tfs.fileTracker = NewFileTracker(tfs.removeFromLocalInternal)
if c.Eviction == nil {
var err error
c.Eviction, err = newRistrettoEviction(c.AllocatedBytes(), tfs.removeFromLocal)
Expand All @@ -83,7 +84,7 @@ func (tfs *TierFS) log(ctx context.Context) logging.Logger {
// It does 2 things:
// 1. Adds stored files to the eviction control
// 2. Remove workspace directories and all its content if it
// exist under the namespace dir.
// exists under the namespace dir.
func (tfs *TierFS) handleExistingFiles() error {
if err := filepath.Walk(tfs.fsLocalBaseDir, func(p string, info os.FileInfo, err error) error {
if err != nil {
Expand Down Expand Up @@ -114,7 +115,8 @@ func (tfs *TierFS) removeFromLocal(rPath params.RelativePath, filesize int64) {
// This will be called by the cache eviction mechanism during entry insert.
// We don't want to wait while the file is being removed from the local disk.
evictionHistograms.WithLabelValues(tfs.fsName).Observe(float64(filesize))
go tfs.removeFromLocalInternal(rPath)
// Notify tracker on delete
tfs.fileTracker.Delete(rPath)
}

func (tfs *TierFS) removeFromLocalInternal(rPath params.RelativePath) {
Expand All @@ -128,10 +130,13 @@ func (tfs *TierFS) removeFromLocalInternal(rPath params.RelativePath) {
return
}

if err := tfs.syncDir.deleteDirRecIfEmpty(path.Dir(string(rPath))); err != nil {
tfs.logger.WithError(err).Error("Failed deleting empty dir")
errorsTotal.WithLabelValues(tfs.fsName, "DirRemoval")
}
// Delete Dir async
go func() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we changing it to async?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removeFromLocalInternal was called asynchronously to not delay the flow on the delete dir operation. Now that it is called out of context, I've modified only the call to delete dir to be perfomed async

if err := tfs.syncDir.deleteDirRecIfEmpty(path.Dir(string(rPath))); err != nil {
tfs.logger.WithError(err).Error("Failed deleting empty dir")
errorsTotal.WithLabelValues(tfs.fsName, "DirRemoval")
}
}()
}

func (tfs *TierFS) store(ctx context.Context, namespace, originalPath, nsPath, filename string) error {
Expand Down Expand Up @@ -205,7 +210,7 @@ func (tfs *TierFS) Create(_ context.Context, namespace string) (StoredFile, erro
}, nil
}

// Open returns the a file descriptor to the local file.
// Open returns a file descriptor to the local file.
// If the file is missing from the local disk, it will try to fetch it from the block storage.
func (tfs *TierFS) Open(ctx context.Context, namespace, filename string) (File, error) {
nsPath, err := parseNamespacePath(namespace)
Expand Down Expand Up @@ -256,19 +261,12 @@ func (tfs *TierFS) openFile(ctx context.Context, fileRef localFileRef, fh *os.Fi
}

if !tfs.eviction.Store(fileRef.fsRelativePath, stat.Size()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OnReject is called also in case method returns false?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both OnEvict and OnReject CB are configured to run the same OnEvict method

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right - but for a rejected file you will call tfs.fileTracker.Delete(rPath) twice. Here and in the OnReject callback (which calls tfs.removeFromLocal)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From what I see onReject is called only in processItems (NewCache, Clear)
This flow is for Set

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now sure I understand. onReject is called whenever a file isn't being stored (decided by their policy) - so here we would definitely call Delete twice for a rejected file.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ristrettoEviction Store calls cache Set which in turn calls SetWithTTL.
In the SetWithTTL flow there are several places where the function return false without calling onReject CB.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right - onReject only gets called when the eviction policy decides to reject. All other rejections are not called.

tfs.fileTracker.Delete(fileRef.fsRelativePath)
tfs.log(ctx).WithFields(logging.Fields{
"namespace": fileRef.namespace,
"file": fileRef.filename,
"full_path": fileRef.fullPath,
}).Info("stored file immediately rejected from cache (delete but continue)")

// A rare occurrence, (currently) happens when Ristretto cache is not set up
// to perform any caching. So be less strict: prefer to serve the file and
// delete it from the cache. It will be removed from disk when the last
// surviving file descriptor -- returned from this function -- is closed.
if err := os.Remove(fileRef.fullPath); err != nil {
return nil, err
}
}
return &ROFile{
File: fh,
Expand All @@ -289,7 +287,10 @@ func (tfs *TierFS) openWithLock(ctx context.Context, fileRef localFileRef) (*os.
"fullpath": fileRef.fullPath,
}).Trace("try to lock for open")
}
_, err := tfs.keyLock.Compute(fileRef.filename, func() (interface{}, error) {

closer := tfs.fileTracker.Open(fileRef.fsRelativePath)
defer closer()
fileFullPath, err := tfs.keyLock.Compute(fileRef.filename, func() (interface{}, error) {
// check again file existence, now that we have the lock
_, err := os.Stat(fileRef.fullPath)
if err == nil {
Expand All @@ -301,7 +302,8 @@ func (tfs *TierFS) openWithLock(ctx context.Context, fileRef localFileRef) (*os.
}).Trace("got lock; file exists after all")
}
cacheAccess.WithLabelValues(tfs.fsName, "Hit").Inc()
return nil, nil

return fileRef.fullPath, nil
}
if !os.IsNotExist(err) {
return nil, fmt.Errorf("stat file: %w", err)
Expand Down Expand Up @@ -350,18 +352,16 @@ func (tfs *TierFS) openWithLock(ctx context.Context, fileRef localFileRef) (*os.
return nil, fmt.Errorf("rename temp file: %w", err)
}

return nil, nil
return fileRef.fullPath, nil
})

if err != nil {
return nil, err
}

fh, err := os.Open(fileRef.fullPath)
fh, err := os.Open(fileFullPath.(string))
if err != nil {
return nil, fmt.Errorf("open file: %w", err)
}

return fh, nil
}

Expand Down