Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

added support for put-if-absent operations #1823

Merged
merged 2 commits into from
Apr 27, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions api/swagger.yml
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,12 @@ components:
application/json:
schema:
$ref: "#/components/schemas/Error"
PreconditionFailed:
description: Precondition Failed
content:
application/json:
schema:
$ref: "#/components/schemas/Error"
ValidationError:
description: Validation Error
content:
Expand Down Expand Up @@ -2383,6 +2389,14 @@ paths:
required: false
schema:
type: string
- in: header
name: If-None-Match
description: Currently supports only "*" to allow uploading an object only if one doesn't exist yet
example: "*"
required: false
schema:
type: string
pattern: '^\*$' # Currently, only "*" is supported
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nopcoder I hope our generated clients don't validate. Otherwise when we add other values, an old client library won't work with an application that wants to use a non-* value.

responses:
201:
description: object metadata
Expand All @@ -2396,6 +2410,8 @@ paths:
$ref: "#/components/responses/Unauthorized"
404:
$ref: "#/components/responses/NotFound"
412:
$ref: "#/components/responses/PreconditionFailed"
default:
$ref: "#/components/responses/ServerError"
delete:
Expand Down
5 changes: 4 additions & 1 deletion clients/python/docs/ObjectsApi.md
Original file line number Diff line number Diff line change
Expand Up @@ -668,6 +668,7 @@ with lakefs_client.ApiClient(configuration) as api_client:
branch = "branch_example" # str |
path = "path_example" # str |
storage_class = "storageClass_example" # str | (optional)
if_none_match = "*" # str | Currently supports only \"*\" to allow uploading an object only if one doesn't exist yet (optional)
content = open('/path/to/file', 'rb') # file_type | Object content to upload (optional)

# example passing only required values which don't have defaults set
Expand All @@ -680,7 +681,7 @@ with lakefs_client.ApiClient(configuration) as api_client:
# example passing only required values which don't have defaults set
# and optional values
try:
api_response = api_instance.upload_object(repository, branch, path, storage_class=storage_class, content=content)
api_response = api_instance.upload_object(repository, branch, path, storage_class=storage_class, if_none_match=if_none_match, content=content)
pprint(api_response)
except lakefs_client.ApiException as e:
print("Exception when calling ObjectsApi->upload_object: %s\n" % e)
Expand All @@ -695,6 +696,7 @@ Name | Type | Description | Notes
**branch** | **str**| |
**path** | **str**| |
**storage_class** | **str**| | [optional]
**if_none_match** | **str**| Currently supports only \"*\" to allow uploading an object only if one doesn't exist yet | [optional]
**content** | **file_type**| Object content to upload | [optional]

### Return type
Expand All @@ -718,6 +720,7 @@ Name | Type | Description | Notes
**400** | Validation Error | - |
**401** | Unauthorized | - |
**404** | Resource Not Found | - |
**412** | Precondition Failed | - |
**0** | Internal Server Error | - |

[[Back to top]](#) [[Back to API list]](../README.md#documentation-for-api-endpoints) [[Back to Model list]](../README.md#documentation-for-models) [[Back to README]](../README.md)
Expand Down
13 changes: 13 additions & 0 deletions clients/python/lakefs_client/api/objects_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -935,6 +935,7 @@ def __upload_object(

Keyword Args:
storage_class (str): [optional]
if_none_match (str): Currently supports only \"*\" to allow uploading an object only if one doesn't exist yet. [optional]
content (file_type): Object content to upload. [optional]
_return_http_data_only (bool): response data without head status
code and headers. Default is True.
Expand Down Expand Up @@ -1007,6 +1008,7 @@ def __upload_object(
'branch',
'path',
'storage_class',
'if_none_match',
'content',
],
'required': [
Expand All @@ -1019,10 +1021,17 @@ def __upload_object(
'enum': [
],
'validation': [
'if_none_match',
]
},
root_map={
'validations': {
('if_none_match',): {

'regex': {
'pattern': r'^\*$', # noqa: E501
},
},
},
'allowed_values': {
},
Expand All @@ -1035,6 +1044,8 @@ def __upload_object(
(str,),
'storage_class':
(str,),
'if_none_match':
(str,),
'content':
(file_type,),
},
Expand All @@ -1043,13 +1054,15 @@ def __upload_object(
'branch': 'branch',
'path': 'path',
'storage_class': 'storageClass',
'if_none_match': 'If-None-Match',
'content': 'content',
},
'location_map': {
'repository': 'path',
'branch': 'path',
'path': 'query',
'storage_class': 'query',
'if_none_match': 'header',
'content': 'form',
},
'collection_format_map': {
Expand Down
16 changes: 16 additions & 0 deletions docs/assets/js/swagger.yml
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,12 @@ components:
application/json:
schema:
$ref: "#/components/schemas/Error"
PreconditionFailed:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note these are the docs

description: Precondition Failed
content:
application/json:
schema:
$ref: "#/components/schemas/Error"
ValidationError:
description: Validation Error
content:
Expand Down Expand Up @@ -2383,6 +2389,14 @@ paths:
required: false
schema:
type: string
- in: header
name: If-None-Match
description: Currently supports only "*" to allow uploading an object only if one doesn't exist yet
example: "*"
required: false
schema:
type: string
pattern: '^\*$' # Currently, only "*" is supported
responses:
201:
description: object metadata
Expand All @@ -2396,6 +2410,8 @@ paths:
$ref: "#/components/responses/Unauthorized"
404:
$ref: "#/components/responses/NotFound"
412:
$ref: "#/components/responses/PreconditionFailed"
default:
$ref: "#/components/responses/ServerError"
delete:
Expand Down
29 changes: 28 additions & 1 deletion pkg/api/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -1686,6 +1686,28 @@ func (c *Controller) UploadObject(w http.ResponseWriter, r *http.Request, reposi
return
}

// before writing body, ensure preconditions - this means we essentially check for object existence twice:
// once before uploading the body to save resources and time,
// and then graveler will check again when passed a WriteCondition.
allowOverwrite := true
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Swagger validator should catch invalid value for IfNoneMatch - part of the JSON schema validation (if not ignore this comment)
  2. Why do we need to write code at this level to verify the existence and not just pass the boolean value to the underlying catalog? if WriteCondition members were public this will be easier to pass in this case.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea is to test this specific condition outside of Graveler - why wait for the user to upload the body only to fail at CreateEntry if I already know this write is going to fail?

if params.IfNoneMatch != nil {
if StringValue(params.IfNoneMatch) != "*" {
writeError(w, http.StatusBadRequest, "Unsupported value for If-None-Match - Only \"*\" is supported")
return
}
// check if exists
_, err := c.Catalog.GetEntry(ctx, repo.Name, branch, params.Path, catalog.GetEntryParams{ReturnExpired: true})
if err == nil {
writeError(w, http.StatusPreconditionFailed, "path already exists")
return
}
if !errors.Is(err, catalog.ErrNotFound) {
writeError(w, http.StatusInternalServerError, err)
return
}
allowOverwrite = false
}

// write the content
file, handler, err := r.FormFile("content")
if err != nil {
Expand Down Expand Up @@ -1714,7 +1736,12 @@ func (c *Controller) UploadObject(w http.ResponseWriter, r *http.Request, reposi
Size: blob.Size,
Checksum: blob.Checksum,
}
err = c.Catalog.CreateEntry(ctx, repo.Name, branch, entry)

err = c.Catalog.CreateEntry(ctx, repo.Name, branch, entry, graveler.IfAbsent(!allowOverwrite))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool, double-checked locking actually works here! (The previous test is only an optimization, but still nice to know.)

if errors.Is(err, graveler.ErrPreconditionFailed) {
writeError(w, http.StatusPreconditionFailed, "path already exists")
return
}
if handleAPIError(w, err) {
return
}
Expand Down
111 changes: 111 additions & 0 deletions pkg/api/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -690,6 +690,117 @@ func uploadObjectHelper(t testing.TB, ctx context.Context, clt api.ClientWithRes
}, w.FormDataContentType(), &b)
}

func writeMultipart(fieldName, filename, content string) (string, io.Reader) {
var buf bytes.Buffer
mpw := multipart.NewWriter(&buf)
w, _ := mpw.CreateFormFile("content", "bar")
_, _ = w.Write([]byte("hello world!"))
_ = mpw.Close()
return mpw.FormDataContentType(), &buf
}

func TestController_UploadObject(t *testing.T) {
clt, deps := setupClientWithAdmin(t, "")
ctx := context.Background()

_, err := deps.catalog.CreateRepository(ctx, "my-new-repo", onBlock(deps, "foo1"), "main")
testutil.Must(t, err)

t.Run("upload object", func(t *testing.T) {
// write
contentType, buf := writeMultipart("content", "bar", "hello world!")
b, err := clt.UploadObjectWithBodyWithResponse(ctx, "my-new-repo", "main", &api.UploadObjectParams{
Path: "foo/bar",
}, contentType, buf)

testutil.Must(t, err)
if b.StatusCode() == 500 {
t.Fatalf("got 500 while uploading: %v", b.JSONDefault)
}
if b.StatusCode() != 201 {
t.Fatalf("expected 201 for UploadObject, got %d", b.StatusCode())
}
})

t.Run("overwrite", func(t *testing.T) {
// write first
contentType, buf := writeMultipart("content", "baz1", "hello world!")
b, err := clt.UploadObjectWithBodyWithResponse(ctx, "my-new-repo", "main", &api.UploadObjectParams{
Path: "foo/baz1",
}, contentType, buf)
testutil.Must(t, err)
if b.StatusCode() != 201 {
t.Fatalf("expected 201 for UploadObject, got %d", b.StatusCode())
}
// overwrite
contentType, buf = writeMultipart("content", "baz1", "something else!")
b, err = clt.UploadObjectWithBodyWithResponse(ctx, "my-new-repo", "main", &api.UploadObjectParams{
Path: "foo/baz1",
}, contentType, buf)

testutil.Must(t, err)
if b.StatusCode() != 201 {
t.Fatalf("expected 201 for UploadObject, got %d", b.StatusCode())
}
})

t.Run("disable overwrite with if-none-match (uncommitted entry)", func(t *testing.T) {
// write first
contentType, buf := writeMultipart("content", "baz2", "hello world!")
b, err := clt.UploadObjectWithBodyWithResponse(ctx, "my-new-repo", "main", &api.UploadObjectParams{
Path: "foo/baz2",
}, contentType, buf)
testutil.Must(t, err)
if b.StatusCode() != 201 {
t.Fatalf("expected 201 for UploadObject, got %d", b.StatusCode())
}
// overwrite
contentType, buf = writeMultipart("content", "baz2", "something else!")
all := "*"
b, err = clt.UploadObjectWithBodyWithResponse(ctx, "my-new-repo", "main", &api.UploadObjectParams{
Path: "foo/baz2",
IfNoneMatch: &all,
}, contentType, buf)

testutil.Must(t, err)
if b.StatusCode() != 412 {
t.Fatalf("expected 412 for UploadObject, got %d", b.StatusCode())
}
})

t.Run("disable overwrite with if-none-match (committed entry)", func(t *testing.T) {
_, err := deps.catalog.CreateBranch(ctx, "my-new-repo", "another-branch", "main")
testutil.Must(t, err)

// write first
contentType, buf := writeMultipart("content", "baz3", "hello world!")
b, err := clt.UploadObjectWithBodyWithResponse(ctx, "my-new-repo", "another-branch", &api.UploadObjectParams{
Path: "foo/baz3",
}, contentType, buf)
testutil.Must(t, err)
if b.StatusCode() != 201 {
t.Fatalf("expected 201 for UploadObject, got %d", b.StatusCode())
}

// commit
_, err = deps.catalog.Commit(ctx, "my-new-repo", "another-branch", "a commit!", "user1", nil)
testutil.Must(t, err)

// overwrite after commit
all := "*"
contentType, buf = writeMultipart("content", "baz3", "something else!")
b, err = clt.UploadObjectWithBodyWithResponse(ctx, "my-new-repo", "another-branch", &api.UploadObjectParams{
Path: "foo/baz3",
IfNoneMatch: &all,
}, contentType, buf)

testutil.Must(t, err)
if b.StatusCode() != 412 {
t.Fatalf("expected 412 for UploadObject, got %d", b.StatusCode())
}
})
}

func TestController_DeleteBranchHandler(t *testing.T) {
clt, deps := setupClientWithAdmin(t, "")
ctx := context.Background()
Expand Down
4 changes: 2 additions & 2 deletions pkg/catalog/catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -654,7 +654,7 @@ func addressTypeToCatalog(t Entry_AddressType) AddressType {
}
}

func (c *Catalog) CreateEntry(ctx context.Context, repository string, branch string, entry DBEntry) error {
func (c *Catalog) CreateEntry(ctx context.Context, repository string, branch string, entry DBEntry, writeConditions ...graveler.WriteConditionOption) error {
repositoryID := graveler.RepositoryID(repository)
branchID := graveler.BranchID(branch)
ent := EntryFromCatalogEntry(entry)
Expand All @@ -671,7 +671,7 @@ func (c *Catalog) CreateEntry(ctx context.Context, repository string, branch str
if err != nil {
return err
}
return c.Store.Set(ctx, repositoryID, branchID, key, *value)
return c.Store.Set(ctx, repositoryID, branchID, key, *value, writeConditions...)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Style nit: no real reason to have varargs here, this API only ever gets generated slices (here on the server side). varargs now means we cannot varargs later, when we might really want it.

}

func (c *Catalog) CreateEntries(ctx context.Context, repository string, branch string, entries []DBEntry) error {
Expand Down
2 changes: 1 addition & 1 deletion pkg/catalog/fake_graveler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func (g *FakeGraveler) Get(_ context.Context, repositoryID graveler.RepositoryID
return v, nil
}

func (g *FakeGraveler) Set(_ context.Context, repositoryID graveler.RepositoryID, branchID graveler.BranchID, key graveler.Key, value graveler.Value) error {
func (g *FakeGraveler) Set(_ context.Context, repositoryID graveler.RepositoryID, branchID graveler.BranchID, key graveler.Key, value graveler.Value, _ ...graveler.WriteConditionOption) error {
if g.Err != nil {
return g.Err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/catalog/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ type Interface interface {
// GetEntry returns the current entry for path in repository branch reference. Returns
// the entry with ExpiredError if it has expired from underlying storage.
GetEntry(ctx context.Context, repository, reference string, path string, params GetEntryParams) (*DBEntry, error)
CreateEntry(ctx context.Context, repository, branch string, entry DBEntry) error
CreateEntry(ctx context.Context, repository, branch string, entry DBEntry, writeConditions ...graveler.WriteConditionOption) error
CreateEntries(ctx context.Context, repository, branch string, entries []DBEntry) error
DeleteEntry(ctx context.Context, repository, branch string, path string) error
ListEntries(ctx context.Context, repository, reference string, prefix, after string, delimiter string, limit int) ([]*DBEntry, bool, error)
Expand Down
1 change: 1 addition & 0 deletions pkg/graveler/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ var (
// TODO(ariels): Wrap with ErrUserVisible once db is gone.
ErrNotFound = wrapError(db.ErrNotFound, "not found")
ErrNotUnique = errors.New("not unique")
ErrPreconditionFailed = errors.New("precondition failed")
ErrInvalidValue = errors.New("invalid value")
ErrInvalidMergeBase = fmt.Errorf("only 2 commits allowed in FindMergeBase: %w", ErrInvalidValue)
ErrNoMergeBase = errors.New("no merge base")
Expand Down
Loading