Skip to content

Commit

Permalink
added support for put-if-absent operations (#1823)
Browse files Browse the repository at this point in the history
  • Loading branch information
ozkatz authored Apr 27, 2021
1 parent 8024952 commit 7e611ed
Show file tree
Hide file tree
Showing 14 changed files with 271 additions and 29 deletions.
16 changes: 16 additions & 0 deletions api/swagger.yml
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,12 @@ components:
application/json:
schema:
$ref: "#/components/schemas/Error"
PreconditionFailed:
description: Precondition Failed
content:
application/json:
schema:
$ref: "#/components/schemas/Error"
ValidationError:
description: Validation Error
content:
Expand Down Expand Up @@ -2383,6 +2389,14 @@ paths:
required: false
schema:
type: string
- in: header
name: If-None-Match
description: Currently supports only "*" to allow uploading an object only if one doesn't exist yet
example: "*"
required: false
schema:
type: string
pattern: '^\*$' # Currently, only "*" is supported
responses:
201:
description: object metadata
Expand All @@ -2396,6 +2410,8 @@ paths:
$ref: "#/components/responses/Unauthorized"
404:
$ref: "#/components/responses/NotFound"
412:
$ref: "#/components/responses/PreconditionFailed"
default:
$ref: "#/components/responses/ServerError"
delete:
Expand Down
5 changes: 4 additions & 1 deletion clients/python/docs/ObjectsApi.md
Original file line number Diff line number Diff line change
Expand Up @@ -668,6 +668,7 @@ with lakefs_client.ApiClient(configuration) as api_client:
branch = "branch_example" # str |
path = "path_example" # str |
storage_class = "storageClass_example" # str | (optional)
if_none_match = "*" # str | Currently supports only \"*\" to allow uploading an object only if one doesn't exist yet (optional)
content = open('/path/to/file', 'rb') # file_type | Object content to upload (optional)

# example passing only required values which don't have defaults set
Expand All @@ -680,7 +681,7 @@ with lakefs_client.ApiClient(configuration) as api_client:
# example passing only required values which don't have defaults set
# and optional values
try:
api_response = api_instance.upload_object(repository, branch, path, storage_class=storage_class, content=content)
api_response = api_instance.upload_object(repository, branch, path, storage_class=storage_class, if_none_match=if_none_match, content=content)
pprint(api_response)
except lakefs_client.ApiException as e:
print("Exception when calling ObjectsApi->upload_object: %s\n" % e)
Expand All @@ -695,6 +696,7 @@ Name | Type | Description | Notes
**branch** | **str**| |
**path** | **str**| |
**storage_class** | **str**| | [optional]
**if_none_match** | **str**| Currently supports only \"*\" to allow uploading an object only if one doesn't exist yet | [optional]
**content** | **file_type**| Object content to upload | [optional]

### Return type
Expand All @@ -718,6 +720,7 @@ Name | Type | Description | Notes
**400** | Validation Error | - |
**401** | Unauthorized | - |
**404** | Resource Not Found | - |
**412** | Precondition Failed | - |
**0** | Internal Server Error | - |

[[Back to top]](#) [[Back to API list]](../README.md#documentation-for-api-endpoints) [[Back to Model list]](../README.md#documentation-for-models) [[Back to README]](../README.md)
Expand Down
13 changes: 13 additions & 0 deletions clients/python/lakefs_client/api/objects_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -935,6 +935,7 @@ def __upload_object(
Keyword Args:
storage_class (str): [optional]
if_none_match (str): Currently supports only \"*\" to allow uploading an object only if one doesn't exist yet. [optional]
content (file_type): Object content to upload. [optional]
_return_http_data_only (bool): response data without head status
code and headers. Default is True.
Expand Down Expand Up @@ -1007,6 +1008,7 @@ def __upload_object(
'branch',
'path',
'storage_class',
'if_none_match',
'content',
],
'required': [
Expand All @@ -1019,10 +1021,17 @@ def __upload_object(
'enum': [
],
'validation': [
'if_none_match',
]
},
root_map={
'validations': {
('if_none_match',): {

'regex': {
'pattern': r'^\*$', # noqa: E501
},
},
},
'allowed_values': {
},
Expand All @@ -1035,6 +1044,8 @@ def __upload_object(
(str,),
'storage_class':
(str,),
'if_none_match':
(str,),
'content':
(file_type,),
},
Expand All @@ -1043,13 +1054,15 @@ def __upload_object(
'branch': 'branch',
'path': 'path',
'storage_class': 'storageClass',
'if_none_match': 'If-None-Match',
'content': 'content',
},
'location_map': {
'repository': 'path',
'branch': 'path',
'path': 'query',
'storage_class': 'query',
'if_none_match': 'header',
'content': 'form',
},
'collection_format_map': {
Expand Down
16 changes: 16 additions & 0 deletions docs/assets/js/swagger.yml
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,12 @@ components:
application/json:
schema:
$ref: "#/components/schemas/Error"
PreconditionFailed:
description: Precondition Failed
content:
application/json:
schema:
$ref: "#/components/schemas/Error"
ValidationError:
description: Validation Error
content:
Expand Down Expand Up @@ -2383,6 +2389,14 @@ paths:
required: false
schema:
type: string
- in: header
name: If-None-Match
description: Currently supports only "*" to allow uploading an object only if one doesn't exist yet
example: "*"
required: false
schema:
type: string
pattern: '^\*$' # Currently, only "*" is supported
responses:
201:
description: object metadata
Expand All @@ -2396,6 +2410,8 @@ paths:
$ref: "#/components/responses/Unauthorized"
404:
$ref: "#/components/responses/NotFound"
412:
$ref: "#/components/responses/PreconditionFailed"
default:
$ref: "#/components/responses/ServerError"
delete:
Expand Down
29 changes: 28 additions & 1 deletion pkg/api/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -1686,6 +1686,28 @@ func (c *Controller) UploadObject(w http.ResponseWriter, r *http.Request, reposi
return
}

// before writing body, ensure preconditions - this means we essentially check for object existence twice:
// once before uploading the body to save resources and time,
// and then graveler will check again when passed a WriteCondition.
allowOverwrite := true
if params.IfNoneMatch != nil {
if StringValue(params.IfNoneMatch) != "*" {
writeError(w, http.StatusBadRequest, "Unsupported value for If-None-Match - Only \"*\" is supported")
return
}
// check if exists
_, err := c.Catalog.GetEntry(ctx, repo.Name, branch, params.Path, catalog.GetEntryParams{ReturnExpired: true})
if err == nil {
writeError(w, http.StatusPreconditionFailed, "path already exists")
return
}
if !errors.Is(err, catalog.ErrNotFound) {
writeError(w, http.StatusInternalServerError, err)
return
}
allowOverwrite = false
}

// write the content
file, handler, err := r.FormFile("content")
if err != nil {
Expand Down Expand Up @@ -1714,7 +1736,12 @@ func (c *Controller) UploadObject(w http.ResponseWriter, r *http.Request, reposi
Size: blob.Size,
Checksum: blob.Checksum,
}
err = c.Catalog.CreateEntry(ctx, repo.Name, branch, entry)

err = c.Catalog.CreateEntry(ctx, repo.Name, branch, entry, graveler.IfAbsent(!allowOverwrite))
if errors.Is(err, graveler.ErrPreconditionFailed) {
writeError(w, http.StatusPreconditionFailed, "path already exists")
return
}
if handleAPIError(w, err) {
return
}
Expand Down
111 changes: 111 additions & 0 deletions pkg/api/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -690,6 +690,117 @@ func uploadObjectHelper(t testing.TB, ctx context.Context, clt api.ClientWithRes
}, w.FormDataContentType(), &b)
}

func writeMultipart(fieldName, filename, content string) (string, io.Reader) {
var buf bytes.Buffer
mpw := multipart.NewWriter(&buf)
w, _ := mpw.CreateFormFile("content", "bar")
_, _ = w.Write([]byte("hello world!"))
_ = mpw.Close()
return mpw.FormDataContentType(), &buf
}

func TestController_UploadObject(t *testing.T) {
clt, deps := setupClientWithAdmin(t, "")
ctx := context.Background()

_, err := deps.catalog.CreateRepository(ctx, "my-new-repo", onBlock(deps, "foo1"), "main")
testutil.Must(t, err)

t.Run("upload object", func(t *testing.T) {
// write
contentType, buf := writeMultipart("content", "bar", "hello world!")
b, err := clt.UploadObjectWithBodyWithResponse(ctx, "my-new-repo", "main", &api.UploadObjectParams{
Path: "foo/bar",
}, contentType, buf)

testutil.Must(t, err)
if b.StatusCode() == 500 {
t.Fatalf("got 500 while uploading: %v", b.JSONDefault)
}
if b.StatusCode() != 201 {
t.Fatalf("expected 201 for UploadObject, got %d", b.StatusCode())
}
})

t.Run("overwrite", func(t *testing.T) {
// write first
contentType, buf := writeMultipart("content", "baz1", "hello world!")
b, err := clt.UploadObjectWithBodyWithResponse(ctx, "my-new-repo", "main", &api.UploadObjectParams{
Path: "foo/baz1",
}, contentType, buf)
testutil.Must(t, err)
if b.StatusCode() != 201 {
t.Fatalf("expected 201 for UploadObject, got %d", b.StatusCode())
}
// overwrite
contentType, buf = writeMultipart("content", "baz1", "something else!")
b, err = clt.UploadObjectWithBodyWithResponse(ctx, "my-new-repo", "main", &api.UploadObjectParams{
Path: "foo/baz1",
}, contentType, buf)

testutil.Must(t, err)
if b.StatusCode() != 201 {
t.Fatalf("expected 201 for UploadObject, got %d", b.StatusCode())
}
})

t.Run("disable overwrite with if-none-match (uncommitted entry)", func(t *testing.T) {
// write first
contentType, buf := writeMultipart("content", "baz2", "hello world!")
b, err := clt.UploadObjectWithBodyWithResponse(ctx, "my-new-repo", "main", &api.UploadObjectParams{
Path: "foo/baz2",
}, contentType, buf)
testutil.Must(t, err)
if b.StatusCode() != 201 {
t.Fatalf("expected 201 for UploadObject, got %d", b.StatusCode())
}
// overwrite
contentType, buf = writeMultipart("content", "baz2", "something else!")
all := "*"
b, err = clt.UploadObjectWithBodyWithResponse(ctx, "my-new-repo", "main", &api.UploadObjectParams{
Path: "foo/baz2",
IfNoneMatch: &all,
}, contentType, buf)

testutil.Must(t, err)
if b.StatusCode() != 412 {
t.Fatalf("expected 412 for UploadObject, got %d", b.StatusCode())
}
})

t.Run("disable overwrite with if-none-match (committed entry)", func(t *testing.T) {
_, err := deps.catalog.CreateBranch(ctx, "my-new-repo", "another-branch", "main")
testutil.Must(t, err)

// write first
contentType, buf := writeMultipart("content", "baz3", "hello world!")
b, err := clt.UploadObjectWithBodyWithResponse(ctx, "my-new-repo", "another-branch", &api.UploadObjectParams{
Path: "foo/baz3",
}, contentType, buf)
testutil.Must(t, err)
if b.StatusCode() != 201 {
t.Fatalf("expected 201 for UploadObject, got %d", b.StatusCode())
}

// commit
_, err = deps.catalog.Commit(ctx, "my-new-repo", "another-branch", "a commit!", "user1", nil)
testutil.Must(t, err)

// overwrite after commit
all := "*"
contentType, buf = writeMultipart("content", "baz3", "something else!")
b, err = clt.UploadObjectWithBodyWithResponse(ctx, "my-new-repo", "another-branch", &api.UploadObjectParams{
Path: "foo/baz3",
IfNoneMatch: &all,
}, contentType, buf)

testutil.Must(t, err)
if b.StatusCode() != 412 {
t.Fatalf("expected 412 for UploadObject, got %d", b.StatusCode())
}
})
}

func TestController_DeleteBranchHandler(t *testing.T) {
clt, deps := setupClientWithAdmin(t, "")
ctx := context.Background()
Expand Down
4 changes: 2 additions & 2 deletions pkg/catalog/catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -654,7 +654,7 @@ func addressTypeToCatalog(t Entry_AddressType) AddressType {
}
}

func (c *Catalog) CreateEntry(ctx context.Context, repository string, branch string, entry DBEntry) error {
func (c *Catalog) CreateEntry(ctx context.Context, repository string, branch string, entry DBEntry, writeConditions ...graveler.WriteConditionOption) error {
repositoryID := graveler.RepositoryID(repository)
branchID := graveler.BranchID(branch)
ent := EntryFromCatalogEntry(entry)
Expand All @@ -671,7 +671,7 @@ func (c *Catalog) CreateEntry(ctx context.Context, repository string, branch str
if err != nil {
return err
}
return c.Store.Set(ctx, repositoryID, branchID, key, *value)
return c.Store.Set(ctx, repositoryID, branchID, key, *value, writeConditions...)
}

func (c *Catalog) CreateEntries(ctx context.Context, repository string, branch string, entries []DBEntry) error {
Expand Down
2 changes: 1 addition & 1 deletion pkg/catalog/fake_graveler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func (g *FakeGraveler) Get(_ context.Context, repositoryID graveler.RepositoryID
return v, nil
}

func (g *FakeGraveler) Set(_ context.Context, repositoryID graveler.RepositoryID, branchID graveler.BranchID, key graveler.Key, value graveler.Value) error {
func (g *FakeGraveler) Set(_ context.Context, repositoryID graveler.RepositoryID, branchID graveler.BranchID, key graveler.Key, value graveler.Value, _ ...graveler.WriteConditionOption) error {
if g.Err != nil {
return g.Err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/catalog/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ type Interface interface {
// GetEntry returns the current entry for path in repository branch reference. Returns
// the entry with ExpiredError if it has expired from underlying storage.
GetEntry(ctx context.Context, repository, reference string, path string, params GetEntryParams) (*DBEntry, error)
CreateEntry(ctx context.Context, repository, branch string, entry DBEntry) error
CreateEntry(ctx context.Context, repository, branch string, entry DBEntry, writeConditions ...graveler.WriteConditionOption) error
CreateEntries(ctx context.Context, repository, branch string, entries []DBEntry) error
DeleteEntry(ctx context.Context, repository, branch string, path string) error
ListEntries(ctx context.Context, repository, reference string, prefix, after string, delimiter string, limit int) ([]*DBEntry, bool, error)
Expand Down
1 change: 1 addition & 0 deletions pkg/graveler/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ var (
// TODO(ariels): Wrap with ErrUserVisible once db is gone.
ErrNotFound = wrapError(db.ErrNotFound, "not found")
ErrNotUnique = errors.New("not unique")
ErrPreconditionFailed = errors.New("precondition failed")
ErrInvalidValue = errors.New("invalid value")
ErrInvalidMergeBase = fmt.Errorf("only 2 commits allowed in FindMergeBase: %w", ErrInvalidValue)
ErrNoMergeBase = errors.New("no merge base")
Expand Down
Loading

0 comments on commit 7e611ed

Please sign in to comment.