Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Include recursive deletion in filer interface #442

Merged
merged 3 commits into from
Jun 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions internal/filer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,10 @@ func runFilerReadWriteTest(t *testing.T, ctx context.Context, f filer.Filer) {
assert.NoError(t, err)
filerTest{t, f}.assertContents(ctx, "/foo/bar", `hello universe`)

// Write should succeed if there is no existing file at the specified path.
err = f.Write(ctx, "/foo/qux", strings.NewReader(`hello universe`))
assert.NoError(t, err)

// Stat on a directory should succeed.
// Note: size and modification time behave differently between WSFS and DBFS.
info, err := f.Stat(ctx, "/foo")
Expand Down Expand Up @@ -97,6 +101,21 @@ func runFilerReadWriteTest(t *testing.T, ctx context.Context, f filer.Filer) {
// Delete should succeed for file that does exist.
err = f.Delete(ctx, "/foo/bar")
assert.NoError(t, err)

// Delete should fail for a non-empty directory.
err = f.Delete(ctx, "/foo")
assert.True(t, errors.As(err, &filer.DirectoryNotEmptyError{}))
assert.True(t, errors.Is(err, fs.ErrInvalid))

// Delete should succeed for a non-empty directory if the DeleteRecursively flag is set.
err = f.Delete(ctx, "/foo", filer.DeleteRecursively)
assert.NoError(t, err)

// Delete of the filer root should ALWAYS fail, otherwise subsequent writes would fail.
// It is not in the filer's purview to delete its root directory.
err = f.Delete(ctx, "/")
assert.True(t, errors.As(err, &filer.CannotDeleteRootError{}))
assert.True(t, errors.Is(err, fs.ErrInvalid))
}

func runFilerReadDirTest(t *testing.T, ctx context.Context, f filer.Filer) {
Expand Down
37 changes: 34 additions & 3 deletions libs/filer/dbfs_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,12 +165,17 @@ func (w *DbfsClient) Read(ctx context.Context, name string) (io.Reader, error) {
return handle, nil
}

func (w *DbfsClient) Delete(ctx context.Context, name string) error {
func (w *DbfsClient) Delete(ctx context.Context, name string, mode ...DeleteMode) error {
absPath, err := w.root.Join(name)
if err != nil {
return err
}

// Illegal to delete the root path.
if absPath == w.root.rootPath {
return CannotDeleteRootError{}
}

// Issue info call before delete because delete succeeds if the specified path doesn't exist.
//
// For discussion: we could decide this is actually convenient, remove the call below,
Expand All @@ -193,10 +198,36 @@ func (w *DbfsClient) Delete(ctx context.Context, name string) error {
return err
}

return w.workspaceClient.Dbfs.Delete(ctx, files.Delete{
recursive := false
if slices.Contains(mode, DeleteRecursively) {
recursive = true
}

err = w.workspaceClient.Dbfs.Delete(ctx, files.Delete{
Path: absPath,
Recursive: false,
Recursive: recursive,
})

// Return early on success.
if err == nil {
return nil
}

// Special handling of this error only if it is an API error.
var aerr *apierr.APIError
if !errors.As(err, &aerr) {
return err
}

switch aerr.StatusCode {
case http.StatusBadRequest:
// Anecdotally, this error is returned when attempting to delete a non-empty directory.
if aerr.ErrorCode == "IO_ERROR" {
return DirectoryNotEmptyError{absPath}
}
}

return err
}

func (w *DbfsClient) ReadDir(ctx context.Context, name string) ([]fs.DirEntry, error) {
Expand Down
33 changes: 31 additions & 2 deletions libs/filer/filer.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,12 @@ const (
CreateParentDirectories = iota << 1
)

type DeleteMode int

const (
DeleteRecursively DeleteMode = iota
)

type FileAlreadyExistsError struct {
path string
}
Expand Down Expand Up @@ -62,6 +68,29 @@ func (err NotADirectory) Is(other error) bool {
return other == fs.ErrInvalid
}

type DirectoryNotEmptyError struct {
path string
}

func (err DirectoryNotEmptyError) Error() string {
return fmt.Sprintf("directory not empty: %s", err.path)
}

func (err DirectoryNotEmptyError) Is(other error) bool {
return other == fs.ErrInvalid
}

type CannotDeleteRootError struct {
}

func (err CannotDeleteRootError) Error() string {
return "unable to delete filer root"
}

func (err CannotDeleteRootError) Is(other error) bool {
return other == fs.ErrInvalid
}

// Filer is used to access files in a workspace.
// It has implementations for accessing files in WSFS and in DBFS.
type Filer interface {
Expand All @@ -72,8 +101,8 @@ type Filer interface {
// Read file at `path`.
Read(ctx context.Context, path string) (io.Reader, error)

// Delete file at `path`.
Delete(ctx context.Context, path string) error
// Delete file or directory at `path`.
Delete(ctx context.Context, path string, mode ...DeleteMode) error

// Return contents of directory at `path`.
ReadDir(ctx context.Context, path string) ([]fs.DirEntry, error)
Expand Down
2 changes: 1 addition & 1 deletion libs/filer/fs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func (f *fakeFiler) Read(ctx context.Context, p string) (io.Reader, error) {
return strings.NewReader("foo"), nil
}

func (f *fakeFiler) Delete(ctx context.Context, p string) error {
func (f *fakeFiler) Delete(ctx context.Context, p string, mode ...DeleteMode) error {
return fmt.Errorf("not implemented")
}

Expand Down
21 changes: 18 additions & 3 deletions libs/filer/workspace_files_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,15 +184,25 @@ func (w *WorkspaceFilesClient) Read(ctx context.Context, name string) (io.Reader
return nil, err
}

func (w *WorkspaceFilesClient) Delete(ctx context.Context, name string) error {
func (w *WorkspaceFilesClient) Delete(ctx context.Context, name string, mode ...DeleteMode) error {
absPath, err := w.root.Join(name)
if err != nil {
return err
}

// Illegal to delete the root path.
if absPath == w.root.rootPath {
return CannotDeleteRootError{}
}

recursive := false
if slices.Contains(mode, DeleteRecursively) {
recursive = true
}

err = w.workspaceClient.Workspace.Delete(ctx, workspace.Delete{
Path: absPath,
Recursive: false,
Recursive: recursive,
})

// Return early on success.
Expand All @@ -206,7 +216,12 @@ func (w *WorkspaceFilesClient) Delete(ctx context.Context, name string) error {
return err
}

if aerr.StatusCode == http.StatusNotFound {
switch aerr.StatusCode {
case http.StatusBadRequest:
if aerr.ErrorCode == "DIRECTORY_NOT_EMPTY" {
return DirectoryNotEmptyError{absPath}
}
case http.StatusNotFound:
return FileDoesNotExistError{absPath}
}

Expand Down