Skip to content

Commit

Permalink
Rename fs level integration - 1 (#2259)
Browse files Browse the repository at this point in the history
* create common functions to use it for rename folder

* review comments

* review comments

* fix review comments

* fix review comments

* test fix

* test fix

* nit

* review comment
  • Loading branch information
Tulsishah authored Aug 5, 2024
1 parent 3d4d665 commit d5c371c
Showing 1 changed file with 50 additions and 32 deletions.
82 changes: 50 additions & 32 deletions internal/fs/fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -1972,6 +1972,44 @@ func (fs *fileSystem) renameFile(
return nil
}

func (fs *fileSystem) releaseInodes(inodes *[]inode.DirInode) {
for _, in := range *inodes {
fs.unlockAndDecrementLookupCount(in, 1)
}
*inodes = []inode.DirInode{}
}

func (fs *fileSystem) getBucketDirInode(ctx context.Context, parent inode.DirInode, name string) (inode.BucketOwnedDirInode, error) {
dir, err := fs.lookUpOrCreateChildDirInode(ctx, parent, name)
if err != nil {
return nil, fmt.Errorf("lookup directory: %w", err)
}
return dir, nil
}

func (fs *fileSystem) ensureNoOpenFilesInDirectory(dir inode.BucketOwnedDirInode, name string) error {
fs.mu.Lock()
entries := dir.LocalFileEntries(fs.localFileInodes)
fs.mu.Unlock()

if len(entries) != 0 {
return fmt.Errorf("can't rename directory %s with open files: %w", name, syscall.ENOTSUP)
}
return nil
}

func (fs *fileSystem) checkDirNotEmpty(dir inode.BucketOwnedDirInode, name string) error {
unexpected, err := dir.ReadDescendants(context.Background(), 1)
if err != nil {
return fmt.Errorf("read descendants of the new directory %q: %w", name, err)
}

if len(unexpected) > 0 {
return fuse.ENOTEMPTY
}
return nil
}

// Rename an old directory to a new directory. If the new directory already
// exists and is non-empty, return ENOTEMPTY.
//
Expand All @@ -1989,27 +2027,16 @@ func (fs *fileSystem) renameDir(
// lookUpOrCreateChildInode (since the pending inodes are not sent back to
// the kernel) and unlocks the pending inodes, but only once
var pendingInodes []inode.DirInode
releaseInodes := func() {
for _, in := range pendingInodes {
fs.unlockAndDecrementLookupCount(in, 1)
}
pendingInodes = []inode.DirInode{}
}
defer releaseInodes()
defer fs.releaseInodes(&pendingInodes)

// Get the inode of the old directory
oldDir, err := fs.lookUpOrCreateChildDirInode(ctx, oldParent, oldName)
oldDir, err := fs.getBucketDirInode(ctx, oldParent, oldName)
if err != nil {
return fmt.Errorf("lookup old directory: %w", err)
return err
}
pendingInodes = append(pendingInodes, oldDir)

// If old directory contains local (un-synced) files, rename operation is not supported.
fs.mu.Lock()
entries := oldDir.LocalFileEntries(fs.localFileInodes)
fs.mu.Unlock()
if len(entries) != 0 {
return fmt.Errorf("can't rename directory %s with open files: %w", oldName, syscall.ENOTSUP)
if err = fs.ensureNoOpenFilesInDirectory(oldDir, oldName); err != nil {
return err
}

// Fetch all the descendants of the old directory recursively
Expand All @@ -2035,27 +2062,19 @@ func (fs *fileSystem) renameDir(
}
}

// Get the inode of the new directory
newDir, err := fs.lookUpOrCreateChildDirInode(ctx, newParent, newName)
newDir, err := fs.getBucketDirInode(ctx, newParent, newName)
if err != nil {
return fmt.Errorf("lookup new directory: %w", err)
return err
}
pendingInodes = append(pendingInodes, newDir)

// Fail the operation if the new directory is non-empty.
unexpected, err := newDir.ReadDescendants(ctx, 1)
if err != nil {
return fmt.Errorf("read descendants of the new directory %q: %w", newName, err)
}
if len(unexpected) > 0 {
return fuse.ENOTEMPTY
if err = fs.checkDirNotEmpty(newDir, newName); err != nil {
return err
}

// Move all the files from the old directory to the new directory, keeping
// both directories locked.
// Move all the files from the old directory to the new directory, keeping both directories locked.
for _, descendant := range descendants {
nameDiff := strings.TrimPrefix(
descendant.FullName.GcsObjectName(), oldDir.Name().GcsObjectName())
nameDiff := strings.TrimPrefix(descendant.FullName.GcsObjectName(), oldDir.Name().GcsObjectName())
if nameDiff == descendant.FullName.GcsObjectName() {
return fmt.Errorf("unwanted descendant %q not from dir %q", descendant.FullName, oldDir.Name())
}
Expand All @@ -2073,8 +2092,7 @@ func (fs *fileSystem) renameDir(
}
}

// We are done with both directories.
releaseInodes()
fs.releaseInodes(&pendingInodes)

// Delete the backing object of the old directory.
fs.mu.Lock()
Expand Down

0 comments on commit d5c371c

Please sign in to comment.