Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Delete objects as part of Graveler #4205

Merged
merged 4 commits into from
Sep 21, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions api/swagger.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2960,8 +2960,6 @@ paths:
application/json:
schema:
$ref: "#/components/schemas/ObjectErrorList"
204:
description: all requested objects successfully deleted
401:
$ref: "#/components/responses/Unauthorized"
404:
Expand Down
2 changes: 0 additions & 2 deletions clients/java/api/openapi.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion clients/java/docs/ObjectsApi.md

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion clients/python/docs/ObjectsApi.md

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

36 changes: 21 additions & 15 deletions pkg/api/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,15 +106,16 @@ func (c *Controller) DeleteObjects(w http.ResponseWriter, r *http.Request, body

// limit check
if len(body.Paths) > DefaultMaxDeleteObjects {
writeError(w, http.StatusInternalServerError, fmt.Errorf("%w, max paths is set to %d",
ErrRequestSizeExceeded, DefaultMaxDeleteObjects))
err := fmt.Errorf("%w, max paths is set to %d", ErrRequestSizeExceeded, DefaultMaxDeleteObjects)
writeError(w, http.StatusInternalServerError, err)
return
}

// delete all the files and collect responses
var errs []ObjectError
// errs used to collect errors as part of the response, can't be nil
errs := make([]ObjectError, 0)
itaiad200 marked this conversation as resolved.
Show resolved Hide resolved
// check if we authorize to delete each object, prepare a list of paths we can delete
var pathsToDelete []string
for _, objectPath := range body.Paths {
// authorize this object deletion
if !c.authorize(w, r, permissions.Node{
Permission: permissions.Permission{
Action: permissions.DeleteObjectAction,
Expand All @@ -126,14 +127,24 @@ func (c *Controller) DeleteObjects(w http.ResponseWriter, r *http.Request, body
StatusCode: http.StatusUnauthorized,
Message: http.StatusText(http.StatusUnauthorized),
})
continue
} else {
pathsToDelete = append(pathsToDelete, objectPath)
}
}

// batch delete the entries we allow to delete
delErr := c.Catalog.DeleteEntries(ctx, repository, branch, pathsToDelete)
delErrs := graveler.NewMapDeleteErrors(delErr)
for _, objectPath := range pathsToDelete {
Comment on lines +136 to +138
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are you iterating the pathsToDelete, rather than the actual errors?
What should happen if all deletions in batch are successful?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At the first part of the function we verified authorization - each file that we do not have access we added an error or added to 'pathsToDelete'.
The list 'pathsToDelete' represents all the files we need to delete (errors will include why we can't delete the rest).
Part of the protocol we need to report (optional - quiet flag) which files we deleted or not.
So going over the files it is to either report a successful delete or the specific/general error why it is not.

// set err to the specific error when possible
err := delErrs[objectPath]
if err == nil {
err = delErr
}
lg := c.Logger.WithField("path", objectPath)
err := c.Catalog.DeleteEntry(ctx, repository, branch, objectPath)
switch {
case errors.Is(err, catalog.ErrNotFound):
lg.Debug("tried to delete a non-existent object")
case errors.Is(err, catalog.ErrNotFound), errors.Is(err, graveler.ErrNotFound):
lg.WithError(err).Debug("tried to delete a non-existent object")
case errors.Is(err, graveler.ErrWriteToProtectedBranch):
errs = append(errs, ObjectError{
Path: StringPtr(objectPath),
Expand All @@ -156,12 +167,7 @@ func (c *Controller) DeleteObjects(w http.ResponseWriter, r *http.Request, body
lg.Debug("object set for deletion")
}
}
// no content in case there are no errors
if len(errs) == 0 {
writeResponse(w, http.StatusNoContent, nil)
return
}
// status ok with list of errors

response := ObjectErrorList{
Errors: errs,
}
Expand Down
18 changes: 16 additions & 2 deletions pkg/api/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"text/template"
"time"

"github.com/davecgh/go-spew/spew"
"github.com/go-test/deep"
nanoid "github.com/matoous/go-nanoid/v2"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -1814,8 +1815,11 @@ func TestController_ObjectsDeleteObjectHandler(t *testing.T) {
// delete objects
delResp, err := clt.DeleteObjectsWithResponse(ctx, repo, branch, api.DeleteObjectsJSONRequestBody{Paths: paths})
verifyResponseOK(t, delResp, err)
if delResp.StatusCode() != http.StatusNoContent {
t.Errorf("DeleteObjects should return 204 (no content) for successful delete, got %d", delResp.StatusCode())
if delResp.JSON200 == nil {
t.Errorf("DeleteObjects should return 200 for successful delete, got status code %d", delResp.StatusCode())
}
if len(delResp.JSON200.Errors) > 0 {
t.Errorf("DeleteObjects (round 2) should have no errors, got %v", delResp.JSON200.Errors)
}

// check objects no longer there
Expand All @@ -1830,6 +1834,16 @@ func TestController_ObjectsDeleteObjectHandler(t *testing.T) {
t.Fatalf("expected file to be gone now for '%s'", p)
}
}

// delete objects again - make sure we do not fail or get any error
delResp2, err := clt.DeleteObjectsWithResponse(ctx, repo, branch, api.DeleteObjectsJSONRequestBody{Paths: paths})
verifyResponseOK(t, delResp2, err)
if delResp2.JSON200 == nil {
t.Errorf("DeleteObjects (round 2) should return 200 for successful delete, got status code %d", delResp2.StatusCode())
}
if len(delResp2.JSON200.Errors) > 0 {
t.Errorf("DeleteObjects (round 2) should have no errors, got %s", spew.Sdump(delResp2.JSON200.Errors))
}
})

t.Run("delete objects request size", func(t *testing.T) {
Expand Down
28 changes: 28 additions & 0 deletions pkg/catalog/catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -777,6 +777,34 @@ func (c *Catalog) DeleteEntry(ctx context.Context, repositoryID string, branch s
return c.Store.Delete(ctx, repository, branchID, key)
}

func (c *Catalog) DeleteEntries(ctx context.Context, repositoryID string, branch string, paths []string) error {
branchID := graveler.BranchID(branch)
if err := validator.Validate([]validator.ValidateArg{
{Name: "repository", Value: repositoryID, Fn: graveler.ValidateRepositoryID},
{Name: "branch", Value: branchID, Fn: graveler.ValidateBranchID},
}); err != nil {
return err
}
// validate path
for i, path := range paths {
p := Path(path)
if err := ValidatePath(p); err != nil {
return fmt.Errorf("argument path[%d]: %w", i, err)
}
}

repository, err := c.getRepository(ctx, repositoryID)
if err != nil {
return err
}

keys := make([]graveler.Key, len(paths))
for i := range paths {
keys[i] = graveler.Key(paths[i])
}
return c.Store.DeleteBatch(ctx, repository, branchID, keys)
}

func (c *Catalog) ListEntries(ctx context.Context, repositoryID string, reference string, prefix string, after string, delimiter string, limit int) ([]*DBEntry, bool, error) {
// normalize limit
if limit < 0 || limit > ListEntriesLimitMax {
Expand Down
6 changes: 5 additions & 1 deletion pkg/catalog/fake_graveler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,11 @@ func (g *FakeGraveler) Set(_ context.Context, repository *graveler.RepositoryRec
}

func (g *FakeGraveler) Delete(ctx context.Context, repository *graveler.RepositoryRecord, branchID graveler.BranchID, key graveler.Key) error {
panic("implement me")
return nil
}

func (g *FakeGraveler) DeleteBatch(ctx context.Context, repository *graveler.RepositoryRecord, branchID graveler.BranchID, keys []graveler.Key) error {
return nil
}

func (g *FakeGraveler) List(_ context.Context, _ *graveler.RepositoryRecord, _ graveler.Ref) (graveler.ValueIterator, error) {
Expand Down
1 change: 1 addition & 0 deletions pkg/catalog/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ type Interface interface {
GetEntry(ctx context.Context, repository, reference string, path string, params GetEntryParams) (*DBEntry, error)
CreateEntry(ctx context.Context, repository, branch string, entry DBEntry, writeConditions ...graveler.WriteConditionOption) error
DeleteEntry(ctx context.Context, repository, branch string, path string) error
DeleteEntries(ctx context.Context, repository, branch string, paths []string) error
ListEntries(ctx context.Context, repository, reference string, prefix, after string, delimiter string, limit int) ([]*DBEntry, bool, error)
ResetEntry(ctx context.Context, repository, branch string, path string) error
ResetEntries(ctx context.Context, repository, branch string, prefix string) error
Expand Down
Loading