From 9125bac5fd836b4a7df405f2afa9162c3e87869b Mon Sep 17 00:00:00 2001 From: Oz Katz Date: Tue, 27 Apr 2021 13:47:27 +0300 Subject: [PATCH 1/2] added support for put-if-absent operations --- api/swagger.yml | 16 +++ clients/python/docs/ObjectsApi.md | 5 +- .../python/lakefs_client/api/objects_api.py | 13 +++ docs/assets/js/swagger.yml | 16 +++ pkg/api/controller.go | 32 +++++- pkg/api/controller_test.go | 98 +++++++++++++++++++ pkg/catalog/catalog.go | 4 +- pkg/catalog/fake_graveler_test.go | 2 +- pkg/catalog/interface.go | 2 +- pkg/graveler/errors.go | 1 + pkg/graveler/graveler.go | 37 ++++++- pkg/graveler/staging/manager.go | 16 ++- pkg/graveler/staging/manager_test.go | 32 +++--- pkg/graveler/testutil/fakes.go | 2 +- 14 files changed, 247 insertions(+), 29 deletions(-) diff --git a/api/swagger.yml b/api/swagger.yml index 2c46e90c541..ad3bf2cff70 100644 --- a/api/swagger.yml +++ b/api/swagger.yml @@ -70,6 +70,12 @@ components: application/json: schema: $ref: "#/components/schemas/Error" + PreconditionFailed: + description: Precondition Failed + content: + application/json: + schema: + $ref: "#/components/schemas/Error" ValidationError: description: Validation Error content: @@ -2383,6 +2389,14 @@ paths: required: false schema: type: string + - in: header + name: If-None-Match + description: Currently supports only "*" to allow uploading an object only if one doesn't exist yet. + example: "*" + required: false + schema: + type: string + pattern: '\*' # Currently, only "*" is supported responses: 201: description: object metadata @@ -2396,6 +2410,8 @@ paths: $ref: "#/components/responses/Unauthorized" 404: $ref: "#/components/responses/NotFound" + 412: + $ref: "#/components/responses/PreconditionFailed" default: $ref: "#/components/responses/ServerError" delete: diff --git a/clients/python/docs/ObjectsApi.md b/clients/python/docs/ObjectsApi.md index 81bfeb464a1..15a6e53bfdd 100644 --- a/clients/python/docs/ObjectsApi.md +++ b/clients/python/docs/ObjectsApi.md @@ -668,6 +668,7 @@ with lakefs_client.ApiClient(configuration) as api_client: branch = "branch_example" # str | path = "path_example" # str | storage_class = "storageClass_example" # str | (optional) + if_none_match = "*" # str | Currently supports only \"*\" to allow uploading an object only if one doesn't exist yet. (optional) content = open('/path/to/file', 'rb') # file_type | Object content to upload (optional) # example passing only required values which don't have defaults set @@ -680,7 +681,7 @@ with lakefs_client.ApiClient(configuration) as api_client: # example passing only required values which don't have defaults set # and optional values try: - api_response = api_instance.upload_object(repository, branch, path, storage_class=storage_class, content=content) + api_response = api_instance.upload_object(repository, branch, path, storage_class=storage_class, if_none_match=if_none_match, content=content) pprint(api_response) except lakefs_client.ApiException as e: print("Exception when calling ObjectsApi->upload_object: %s\n" % e) @@ -695,6 +696,7 @@ Name | Type | Description | Notes **branch** | **str**| | **path** | **str**| | **storage_class** | **str**| | [optional] + **if_none_match** | **str**| Currently supports only \"*\" to allow uploading an object only if one doesn't exist yet. | [optional] **content** | **file_type**| Object content to upload | [optional] ### Return type @@ -718,6 +720,7 @@ Name | Type | Description | Notes **400** | Validation Error | - | **401** | Unauthorized | - | **404** | Resource Not Found | - | +**412** | Precondition Failed | - | **0** | Internal Server Error | - | [[Back to top]](#) [[Back to API list]](../README.md#documentation-for-api-endpoints) [[Back to Model list]](../README.md#documentation-for-models) [[Back to README]](../README.md) diff --git a/clients/python/lakefs_client/api/objects_api.py b/clients/python/lakefs_client/api/objects_api.py index b0c5bb9f507..8b6f70b2d84 100644 --- a/clients/python/lakefs_client/api/objects_api.py +++ b/clients/python/lakefs_client/api/objects_api.py @@ -935,6 +935,7 @@ def __upload_object( Keyword Args: storage_class (str): [optional] + if_none_match (str): Currently supports only \"*\" to allow uploading an object only if one doesn't exist yet.. [optional] content (file_type): Object content to upload. [optional] _return_http_data_only (bool): response data without head status code and headers. Default is True. @@ -1007,6 +1008,7 @@ def __upload_object( 'branch', 'path', 'storage_class', + 'if_none_match', 'content', ], 'required': [ @@ -1019,10 +1021,17 @@ def __upload_object( 'enum': [ ], 'validation': [ + 'if_none_match', ] }, root_map={ 'validations': { + ('if_none_match',): { + + 'regex': { + 'pattern': r'\*', # noqa: E501 + }, + }, }, 'allowed_values': { }, @@ -1035,6 +1044,8 @@ def __upload_object( (str,), 'storage_class': (str,), + 'if_none_match': + (str,), 'content': (file_type,), }, @@ -1043,6 +1054,7 @@ def __upload_object( 'branch': 'branch', 'path': 'path', 'storage_class': 'storageClass', + 'if_none_match': 'If-None-Match', 'content': 'content', }, 'location_map': { @@ -1050,6 +1062,7 @@ def __upload_object( 'branch': 'path', 'path': 'query', 'storage_class': 'query', + 'if_none_match': 'header', 'content': 'form', }, 'collection_format_map': { diff --git a/docs/assets/js/swagger.yml b/docs/assets/js/swagger.yml index 2c46e90c541..ad3bf2cff70 100644 --- a/docs/assets/js/swagger.yml +++ b/docs/assets/js/swagger.yml @@ -70,6 +70,12 @@ components: application/json: schema: $ref: "#/components/schemas/Error" + PreconditionFailed: + description: Precondition Failed + content: + application/json: + schema: + $ref: "#/components/schemas/Error" ValidationError: description: Validation Error content: @@ -2383,6 +2389,14 @@ paths: required: false schema: type: string + - in: header + name: If-None-Match + description: Currently supports only "*" to allow uploading an object only if one doesn't exist yet. + example: "*" + required: false + schema: + type: string + pattern: '\*' # Currently, only "*" is supported responses: 201: description: object metadata @@ -2396,6 +2410,8 @@ paths: $ref: "#/components/responses/Unauthorized" 404: $ref: "#/components/responses/NotFound" + 412: + $ref: "#/components/responses/PreconditionFailed" default: $ref: "#/components/responses/ServerError" delete: diff --git a/pkg/api/controller.go b/pkg/api/controller.go index 55ca8a9b57c..353d25e829e 100644 --- a/pkg/api/controller.go +++ b/pkg/api/controller.go @@ -1686,6 +1686,28 @@ func (c *Controller) UploadObject(w http.ResponseWriter, r *http.Request, reposi return } + // before writing body, ensure preconditions - this means we essentially check for object existence twice: + // once before uploading the body to save resources and time, + // and then graveler will check again when passed a WriteCondition. + allowOverwrite := true + if params.IfNoneMatch != nil { + if StringValue(params.IfNoneMatch) != "*" { + writeError(w, http.StatusBadRequest, fmt.Sprintf("Unsupported value for If-None-Match - Only \"*\" is supported")) + return + } + // check if exists + _, err := c.Catalog.GetEntry(ctx, repo.Name, branch, params.Path, catalog.GetEntryParams{ReturnExpired: true}) + if err == nil { + writeError(w, http.StatusPreconditionFailed, "path already exists") + return + } + if !errors.Is(err, catalog.ErrNotFound) { + writeError(w, http.StatusInternalServerError, err) + return + } + allowOverwrite = false + } + // write the content file, handler, err := r.FormFile("content") if err != nil { @@ -1714,7 +1736,15 @@ func (c *Controller) UploadObject(w http.ResponseWriter, r *http.Request, reposi Size: blob.Size, Checksum: blob.Checksum, } - err = c.Catalog.CreateEntry(ctx, repo.Name, branch, entry) + if allowOverwrite { + err = c.Catalog.CreateEntry(ctx, repo.Name, branch, entry) + } else { + err = c.Catalog.CreateEntry(ctx, repo.Name, branch, entry, graveler.IfAbsent()) + } + if errors.Is(err, graveler.ErrPreconditionFailed) { + writeError(w, http.StatusPreconditionFailed, "path already exists") + return + } if handleAPIError(w, err) { return } diff --git a/pkg/api/controller_test.go b/pkg/api/controller_test.go index a7a30908565..54a6ee35320 100644 --- a/pkg/api/controller_test.go +++ b/pkg/api/controller_test.go @@ -690,6 +690,104 @@ func uploadObjectHelper(t testing.TB, ctx context.Context, clt api.ClientWithRes }, w.FormDataContentType(), &b) } +func writeMultipart(fieldName, filename, content string) (string, io.Reader) { + var buf bytes.Buffer + mpw := multipart.NewWriter(&buf) + w, _ := mpw.CreateFormFile("content", "bar") + _, _ = w.Write([]byte("hello world!")) + _ = mpw.Close() + return mpw.FormDataContentType(), &buf +} + +func TestController_UploadObject(t *testing.T) { + clt, deps := setupClientWithAdmin(t, "") + ctx := context.Background() + + t.Run("upload object", func(t *testing.T) { + _, err := deps.catalog.CreateRepository(ctx, "my-new-repo", onBlock(deps, "foo1"), "main") + testutil.Must(t, err) + // write + contentType, buf := writeMultipart("content", "bar", "hello world!") + b, err := clt.UploadObjectWithBodyWithResponse(ctx, "my-new-repo", "main", &api.UploadObjectParams{ + Path: "foo/bar", + }, contentType, buf) + + testutil.Must(t, err) + if b.StatusCode() == 500 { + t.Fatalf("got 500 while uploading: %v", b.JSONDefault) + } + if b.StatusCode() != 201 { + t.Fatalf("expected 201 for UploadObject, got %d", b.StatusCode()) + } + }) + + t.Run("overwrite", func(t *testing.T) { + // write + contentType, buf := writeMultipart("content", "bar", "hello world!") + b, err := clt.UploadObjectWithBodyWithResponse(ctx, "my-new-repo", "main", &api.UploadObjectParams{ + Path: "foo/bar", + }, contentType, buf) + + testutil.Must(t, err) + if b.StatusCode() == 500 { + t.Fatalf("got 500 while uploading: %v", b.JSONDefault) + } + if b.StatusCode() != 201 { + t.Fatalf("expected 201 for UploadObject, got %d", b.StatusCode()) + } + }) + + t.Run("disable overwrite with if-none-match (uncommitted entry)", func(t *testing.T) { + // write + contentType, buf := writeMultipart("content", "bar", "hello world!") + all := "*" + b, err := clt.UploadObjectWithBodyWithResponse(ctx, "my-new-repo", "main", &api.UploadObjectParams{ + Path: "foo/bar", + IfNoneMatch: &all, + }, contentType, buf) + + testutil.Must(t, err) + if b.StatusCode() != 412 { + t.Fatalf("expected 412 for UploadObject, got %d", b.StatusCode()) + } + }) + + t.Run("disable overwrite with if-none-match (committed entry)", func(t *testing.T) { + _, err := deps.catalog.CreateBranch(ctx, "my-new-repo", "another-branch", "main") + testutil.Must(t, err) + + // write first + contentType, buf := writeMultipart("content", "baz", "hello world!") + b, err := clt.UploadObjectWithBodyWithResponse(ctx, "my-new-repo", "another-branch", &api.UploadObjectParams{ + Path: "foo/baz", + }, contentType, buf) + testutil.Must(t, err) + if b.StatusCode() == 500 { + t.Fatalf("got 500 while uploading: %v", b.JSONDefault) + } + if b.StatusCode() != 201 { + t.Fatalf("expected 201 for UploadObject, got %d", b.StatusCode()) + } + + // commit + _, err = deps.catalog.Commit(ctx, "my-new-repo", "another-branch", "a commit!", "user1", nil) + testutil.Must(t, err) + + // overwrite after commit + all := "*" + contentType, buf = writeMultipart("content", "baz", "something else!") + b, err = clt.UploadObjectWithBodyWithResponse(ctx, "my-new-repo", "another-branch", &api.UploadObjectParams{ + Path: "foo/baz", + IfNoneMatch: &all, + }, contentType, buf) + + testutil.Must(t, err) + if b.StatusCode() != 412 { + t.Fatalf("expected 412 for UploadObject, got %d", b.StatusCode()) + } + }) +} + func TestController_DeleteBranchHandler(t *testing.T) { clt, deps := setupClientWithAdmin(t, "") ctx := context.Background() diff --git a/pkg/catalog/catalog.go b/pkg/catalog/catalog.go index d920225ca34..b086516ff92 100644 --- a/pkg/catalog/catalog.go +++ b/pkg/catalog/catalog.go @@ -654,7 +654,7 @@ func addressTypeToCatalog(t Entry_AddressType) AddressType { } } -func (c *Catalog) CreateEntry(ctx context.Context, repository string, branch string, entry DBEntry) error { +func (c *Catalog) CreateEntry(ctx context.Context, repository string, branch string, entry DBEntry, writeConditions ...graveler.WriteCondition) error { repositoryID := graveler.RepositoryID(repository) branchID := graveler.BranchID(branch) ent := EntryFromCatalogEntry(entry) @@ -671,7 +671,7 @@ func (c *Catalog) CreateEntry(ctx context.Context, repository string, branch str if err != nil { return err } - return c.Store.Set(ctx, repositoryID, branchID, key, *value) + return c.Store.Set(ctx, repositoryID, branchID, key, *value, writeConditions...) } func (c *Catalog) CreateEntries(ctx context.Context, repository string, branch string, entries []DBEntry) error { diff --git a/pkg/catalog/fake_graveler_test.go b/pkg/catalog/fake_graveler_test.go index 9fe9490c312..451e3a0c98c 100644 --- a/pkg/catalog/fake_graveler_test.go +++ b/pkg/catalog/fake_graveler_test.go @@ -71,7 +71,7 @@ func (g *FakeGraveler) Get(_ context.Context, repositoryID graveler.RepositoryID return v, nil } -func (g *FakeGraveler) Set(_ context.Context, repositoryID graveler.RepositoryID, branchID graveler.BranchID, key graveler.Key, value graveler.Value) error { +func (g *FakeGraveler) Set(_ context.Context, repositoryID graveler.RepositoryID, branchID graveler.BranchID, key graveler.Key, value graveler.Value, _ ...graveler.WriteCondition) error { if g.Err != nil { return g.Err } diff --git a/pkg/catalog/interface.go b/pkg/catalog/interface.go index d74620ba62b..ae8f115eb28 100644 --- a/pkg/catalog/interface.go +++ b/pkg/catalog/interface.go @@ -82,7 +82,7 @@ type Interface interface { // GetEntry returns the current entry for path in repository branch reference. Returns // the entry with ExpiredError if it has expired from underlying storage. GetEntry(ctx context.Context, repository, reference string, path string, params GetEntryParams) (*DBEntry, error) - CreateEntry(ctx context.Context, repository, branch string, entry DBEntry) error + CreateEntry(ctx context.Context, repository, branch string, entry DBEntry, writeConditions ...graveler.WriteCondition) error CreateEntries(ctx context.Context, repository, branch string, entries []DBEntry) error DeleteEntry(ctx context.Context, repository, branch string, path string) error ListEntries(ctx context.Context, repository, reference string, prefix, after string, delimiter string, limit int) ([]*DBEntry, bool, error) diff --git a/pkg/graveler/errors.go b/pkg/graveler/errors.go index 3b5ecb6e27c..a7364cb4382 100644 --- a/pkg/graveler/errors.go +++ b/pkg/graveler/errors.go @@ -15,6 +15,7 @@ var ( // TODO(ariels): Wrap with ErrUserVisible once db is gone. ErrNotFound = wrapError(db.ErrNotFound, "not found") ErrNotUnique = errors.New("not unique") + ErrPreconditionFailed = errors.New("precondition failed") ErrInvalidValue = errors.New("invalid value") ErrInvalidMergeBase = fmt.Errorf("only 2 commits allowed in FindMergeBase: %w", ErrInvalidValue) ErrNoMergeBase = errors.New("no merge base") diff --git a/pkg/graveler/graveler.go b/pkg/graveler/graveler.go index 29ad13c81f5..3d2f0ec0913 100644 --- a/pkg/graveler/graveler.go +++ b/pkg/graveler/graveler.go @@ -55,6 +55,14 @@ type RangeInfo struct { Address string } +type WriteCondition struct { + ifAbsent bool +} + +func IfAbsent() WriteCondition { + return WriteCondition{ifAbsent: true} +} + // function/methods receiving the following basic types could assume they passed validation // StorageNamespace is the URI to the storage location @@ -245,7 +253,7 @@ type KeyValueStore interface { Get(ctx context.Context, repositoryID RepositoryID, ref Ref, key Key) (*Value, error) // Set stores value on repository / branch by key. nil value is a valid value for tombstone - Set(ctx context.Context, repositoryID RepositoryID, branchID BranchID, key Key, value Value) error + Set(ctx context.Context, repositoryID RepositoryID, branchID BranchID, key Key, value Value, writeConditions ...WriteCondition) error // Delete value from repository / branch branch by key Delete(ctx context.Context, repositoryID RepositoryID, branchID BranchID, key Key) error @@ -564,7 +572,7 @@ type StagingManager interface { Get(ctx context.Context, st StagingToken, key Key) (*Value, error) // Set writes a (possibly nil) value under the given staging token and key. - Set(ctx context.Context, st StagingToken, key Key, value *Value) error + Set(ctx context.Context, st StagingToken, key Key, value *Value, overwrite bool) error // List returns a ValueIterator for the given staging token List(ctx context.Context, st StagingToken) (ValueIterator, error) @@ -871,13 +879,32 @@ func (g *Graveler) Get(ctx context.Context, repositoryID RepositoryID, ref Ref, return g.CommittedManager.Get(ctx, repo.StorageNamespace, commit.MetaRangeID, key) } -func (g *Graveler) Set(ctx context.Context, repositoryID RepositoryID, branchID BranchID, key Key, value Value) error { +func (g *Graveler) Set(ctx context.Context, repositoryID RepositoryID, branchID BranchID, key Key, value Value, writeConditions ...WriteCondition) error { _, err := g.branchLocker.Writer(ctx, repositoryID, branchID, func() (interface{}, error) { branch, err := g.GetBranch(ctx, repositoryID, branchID) if err != nil { return nil, err } - err = g.StagingManager.Set(ctx, branch.StagingToken, key, &value) + allowOverwrite := true + for _, cond := range writeConditions { + if cond.ifAbsent { + allowOverwrite = false + } + } + if !allowOverwrite { + // ensure the given key doesn't exist in the underlying commit first + // Since we're being protected by the branch locker, we're guaranteed the commit + // won't change before we finish the operation + _, err := g.Get(ctx, repositoryID, Ref(branch.CommitID), key) + if err == nil { + // we got a key here already! + return nil, ErrPreconditionFailed + } else if !errors.Is(err, ErrNotFound) { + // another error occurred! + return nil, err + } + } + err = g.StagingManager.Set(ctx, branch.StagingToken, key, &value, allowOverwrite) return nil, err }) return err @@ -935,7 +962,7 @@ func (g *Graveler) Delete(ctx context.Context, repositoryID RepositoryID, branch return nil, ErrNotFound } - return nil, g.StagingManager.Set(ctx, branch.StagingToken, key, nil) + return nil, g.StagingManager.Set(ctx, branch.StagingToken, key, nil, true) }) return err } diff --git a/pkg/graveler/staging/manager.go b/pkg/graveler/staging/manager.go index 07cbcb907de..acb30e6f24e 100644 --- a/pkg/graveler/staging/manager.go +++ b/pkg/graveler/staging/manager.go @@ -41,13 +41,27 @@ func (p *Manager) Get(ctx context.Context, st graveler.StagingToken, key gravele return value, nil } -func (p *Manager) Set(ctx context.Context, st graveler.StagingToken, key graveler.Key, value *graveler.Value) error { +func (p *Manager) Set(ctx context.Context, st graveler.StagingToken, key graveler.Key, value *graveler.Value, overwrite bool) error { if value == nil { value = new(graveler.Value) } else if value.Identity == nil { return graveler.ErrInvalidValue } _, err := p.db.Transact(ctx, func(tx db.Tx) (interface{}, error) { + if !overwrite { + res, err := tx.Exec( + `INSERT INTO graveler_staging_kv (staging_token, key, identity, data) + VALUES ($1, $2, $3, $4) + ON CONFLICT (staging_token, key) DO NOTHING`, + st, key, value.Identity, value.Data) + if err != nil { + return nil, err + } + if res.RowsAffected() == 0 { + return nil, graveler.ErrPreconditionFailed + } + return res, err + } return tx.Exec(`INSERT INTO graveler_staging_kv (staging_token, key, identity, data) VALUES ($1, $2, $3, $4) ON CONFLICT (staging_token, key) DO UPDATE diff --git a/pkg/graveler/staging/manager_test.go b/pkg/graveler/staging/manager_test.go index 146d33237c1..def92240b1f 100644 --- a/pkg/graveler/staging/manager_test.go +++ b/pkg/graveler/staging/manager_test.go @@ -25,7 +25,7 @@ func TestSetGet(t *testing.T) { t.Fatalf("error different than expected. expected=%v, got=%v", graveler.ErrNotFound, err) } value := newTestValue("identity1", "value1") - err = s.Set(ctx, "t1", []byte("a/b/c/"), value) + err = s.Set(ctx, "t1", []byte("a/b/c/"), value, true) testutil.Must(t, err) e, err := s.Get(ctx, "t1", []byte("a/b/c/")) testutil.Must(t, err) @@ -40,14 +40,14 @@ func TestMultiToken(t *testing.T) { if !errors.Is(err, graveler.ErrNotFound) { t.Fatalf("error different than expected. expected=%v, got=%v", graveler.ErrNotFound, err) } - err = s.Set(ctx, "t1", []byte("a/b/c/"), newTestValue("identity1", "value1")) + err = s.Set(ctx, "t1", []byte("a/b/c/"), newTestValue("identity1", "value1"), true) testutil.Must(t, err) e, err := s.Get(ctx, "t1", []byte("a/b/c/")) testutil.Must(t, err) if string(e.Identity) != "identity1" { t.Errorf("got wrong identity. expected=%s, got=%s", "identity1", string(e.Identity)) } - err = s.Set(ctx, "t2", []byte("a/b/c/"), newTestValue("identity2", "value2")) + err = s.Set(ctx, "t2", []byte("a/b/c/"), newTestValue("identity2", "value2"), true) testutil.Must(t, err) e, err = s.Get(ctx, "t1", []byte("a/b/c/")) testutil.Must(t, err) @@ -66,9 +66,9 @@ func TestDrop(t *testing.T) { ctx, s := newTestStagingManager(t) numOfValues := 1400 for i := 0; i < numOfValues; i++ { - err := s.Set(ctx, "t1", []byte(fmt.Sprintf("key%04d", i)), newTestValue(fmt.Sprintf("identity%d", i), fmt.Sprintf("value%d", i))) + err := s.Set(ctx, "t1", []byte(fmt.Sprintf("key%04d", i)), newTestValue(fmt.Sprintf("identity%d", i), fmt.Sprintf("value%d", i)), true) testutil.Must(t, err) - err = s.Set(ctx, "t2", []byte(fmt.Sprintf("key%04d", i)), newTestValue(fmt.Sprintf("identity%d", i), fmt.Sprintf("value%d", i))) + err = s.Set(ctx, "t2", []byte(fmt.Sprintf("key%04d", i)), newTestValue(fmt.Sprintf("identity%d", i), fmt.Sprintf("value%d", i)), true) testutil.Must(t, err) } err := s.Drop(ctx, "t1") @@ -100,9 +100,9 @@ func TestDropByPrefix(t *testing.T) { ctx, s := newTestStagingManager(t) numOfValues := 2400 for i := 0; i < numOfValues; i++ { - err := s.Set(ctx, "t1", []byte(fmt.Sprintf("key%04d", i)), newTestValue(fmt.Sprintf("identity%d", i), fmt.Sprintf("value%d", i))) + err := s.Set(ctx, "t1", []byte(fmt.Sprintf("key%04d", i)), newTestValue(fmt.Sprintf("identity%d", i), fmt.Sprintf("value%d", i)), true) testutil.Must(t, err) - err = s.Set(ctx, "t2", []byte(fmt.Sprintf("key%04d", i)), newTestValue(fmt.Sprintf("identity%d", i), fmt.Sprintf("value%d", i))) + err = s.Set(ctx, "t2", []byte(fmt.Sprintf("key%04d", i)), newTestValue(fmt.Sprintf("identity%d", i), fmt.Sprintf("value%d", i)), true) testutil.Must(t, err) } err := s.DropByPrefix(ctx, "t1", []byte("key1")) @@ -215,7 +215,7 @@ func TestDropPrefixBytes(t *testing.T) { err := s.Set(ctx, st, k, &graveler.Value{ Identity: []byte{0, 0, 0, 0, 0, 0}, Data: []byte{0, 0, 0, 0, 0, 0}, - }) + }, true) testutil.Must(t, err) } err := s.DropByPrefix(ctx, st, tst.prefix) @@ -242,7 +242,7 @@ func TestList(t *testing.T) { for _, numOfValues := range []int{1, 100, 1000, 1500, 2500} { token := graveler.StagingToken(fmt.Sprintf("t_%d", numOfValues)) for i := 0; i < numOfValues; i++ { - err := s.Set(ctx, token, []byte(fmt.Sprintf("key%04d", i)), newTestValue(fmt.Sprintf("identity%d", i), fmt.Sprintf("value%d", i))) + err := s.Set(ctx, token, []byte(fmt.Sprintf("key%04d", i)), newTestValue(fmt.Sprintf("identity%d", i), fmt.Sprintf("value%d", i)), true) testutil.Must(t, err) } res := make([]*graveler.ValueRecord, 0, numOfValues) @@ -272,7 +272,7 @@ func TestSeek(t *testing.T) { ctx, s := newTestStagingManager(t) numOfValues := 100 for i := 0; i < numOfValues; i++ { - err := s.Set(ctx, "t1", []byte(fmt.Sprintf("key%04d", i)), newTestValue("identity1", "value1")) + err := s.Set(ctx, "t1", []byte(fmt.Sprintf("key%04d", i)), newTestValue("identity1", "value1"), true) testutil.Must(t, err) } it, _ := s.List(ctx, "t1") @@ -305,9 +305,9 @@ func TestSeek(t *testing.T) { func TestNilValue(t *testing.T) { ctx, s := newTestStagingManager(t) - err := s.Set(ctx, "t1", []byte("key1"), nil) + err := s.Set(ctx, "t1", []byte("key1"), nil, true) testutil.Must(t, err) - err = s.Set(ctx, "t1", []byte("key2"), newTestValue("identity2", "value2")) + err = s.Set(ctx, "t1", []byte("key2"), newTestValue("identity2", "value2"), true) testutil.Must(t, err) e, err := s.Get(ctx, "t1", []byte("key1")) testutil.Must(t, err) @@ -337,12 +337,12 @@ func TestNilValue(t *testing.T) { func TestNilIdentity(t *testing.T) { ctx, s := newTestStagingManager(t) - err := s.Set(ctx, "t1", []byte("key1"), newTestValue("identity1", "value1")) + err := s.Set(ctx, "t1", []byte("key1"), newTestValue("identity1", "value1"), true) testutil.Must(t, err) err = s.Set(ctx, "t1", []byte("key1"), &graveler.Value{ Identity: nil, Data: []byte("value1"), - }) + }, true) if !errors.Is(err, graveler.ErrInvalidValue) { t.Fatalf("got unexpected error. expected=%v, got=%v", graveler.ErrInvalidValue, err) } @@ -370,7 +370,7 @@ func TestDeleteAndTombstone(t *testing.T) { }, } for _, val := range tombstoneValues { - err = s.Set(ctx, "t1", []byte("key1"), val) + err = s.Set(ctx, "t1", []byte("key1"), val, true) testutil.Must(t, err) e, err := s.Get(ctx, "t1", []byte("key1")) testutil.Must(t, err) @@ -393,7 +393,7 @@ func TestDeleteAndTombstone(t *testing.T) { } it.Close() } - err = s.Set(ctx, "t1", []byte("key1"), newTestValue("identity3", "value3")) + err = s.Set(ctx, "t1", []byte("key1"), newTestValue("identity3", "value3"), true) testutil.Must(t, err) e, err := s.Get(ctx, "t1", []byte("key1")) testutil.Must(t, err) diff --git a/pkg/graveler/testutil/fakes.go b/pkg/graveler/testutil/fakes.go index 1ab6ff57329..fa546b719dc 100644 --- a/pkg/graveler/testutil/fakes.go +++ b/pkg/graveler/testutil/fakes.go @@ -140,7 +140,7 @@ func (s *StagingFake) Get(context.Context, graveler.StagingToken, graveler.Key) return s.Value, nil } -func (s *StagingFake) Set(_ context.Context, _ graveler.StagingToken, key graveler.Key, value *graveler.Value) error { +func (s *StagingFake) Set(_ context.Context, _ graveler.StagingToken, key graveler.Key, value *graveler.Value, _ bool) error { if s.SetErr != nil { return s.SetErr } From 72fdec765d466e8d2c4f07ba12c3020a8abe5a8d Mon Sep 17 00:00:00 2001 From: Oz Katz Date: Tue, 27 Apr 2021 16:29:40 +0300 Subject: [PATCH 2/2] fixed tests to be able to run independently, better design for WriteCondition --- api/swagger.yml | 4 +- clients/python/docs/ObjectsApi.md | 4 +- .../python/lakefs_client/api/objects_api.py | 4 +- docs/assets/js/swagger.yml | 4 +- pkg/api/controller.go | 9 ++-- pkg/api/controller_test.go | 51 ++++++++++++------- pkg/catalog/catalog.go | 2 +- pkg/catalog/fake_graveler_test.go | 2 +- pkg/catalog/interface.go | 2 +- pkg/graveler/graveler.go | 30 ++++++----- pkg/graveler/staging/manager_test.go | 10 ++++ 11 files changed, 73 insertions(+), 49 deletions(-) diff --git a/api/swagger.yml b/api/swagger.yml index ad3bf2cff70..447e25f6692 100644 --- a/api/swagger.yml +++ b/api/swagger.yml @@ -2391,12 +2391,12 @@ paths: type: string - in: header name: If-None-Match - description: Currently supports only "*" to allow uploading an object only if one doesn't exist yet. + description: Currently supports only "*" to allow uploading an object only if one doesn't exist yet example: "*" required: false schema: type: string - pattern: '\*' # Currently, only "*" is supported + pattern: '^\*$' # Currently, only "*" is supported responses: 201: description: object metadata diff --git a/clients/python/docs/ObjectsApi.md b/clients/python/docs/ObjectsApi.md index 15a6e53bfdd..c8b7ec0c2ac 100644 --- a/clients/python/docs/ObjectsApi.md +++ b/clients/python/docs/ObjectsApi.md @@ -668,7 +668,7 @@ with lakefs_client.ApiClient(configuration) as api_client: branch = "branch_example" # str | path = "path_example" # str | storage_class = "storageClass_example" # str | (optional) - if_none_match = "*" # str | Currently supports only \"*\" to allow uploading an object only if one doesn't exist yet. (optional) + if_none_match = "*" # str | Currently supports only \"*\" to allow uploading an object only if one doesn't exist yet (optional) content = open('/path/to/file', 'rb') # file_type | Object content to upload (optional) # example passing only required values which don't have defaults set @@ -696,7 +696,7 @@ Name | Type | Description | Notes **branch** | **str**| | **path** | **str**| | **storage_class** | **str**| | [optional] - **if_none_match** | **str**| Currently supports only \"*\" to allow uploading an object only if one doesn't exist yet. | [optional] + **if_none_match** | **str**| Currently supports only \"*\" to allow uploading an object only if one doesn't exist yet | [optional] **content** | **file_type**| Object content to upload | [optional] ### Return type diff --git a/clients/python/lakefs_client/api/objects_api.py b/clients/python/lakefs_client/api/objects_api.py index 8b6f70b2d84..833a249bd4a 100644 --- a/clients/python/lakefs_client/api/objects_api.py +++ b/clients/python/lakefs_client/api/objects_api.py @@ -935,7 +935,7 @@ def __upload_object( Keyword Args: storage_class (str): [optional] - if_none_match (str): Currently supports only \"*\" to allow uploading an object only if one doesn't exist yet.. [optional] + if_none_match (str): Currently supports only \"*\" to allow uploading an object only if one doesn't exist yet. [optional] content (file_type): Object content to upload. [optional] _return_http_data_only (bool): response data without head status code and headers. Default is True. @@ -1029,7 +1029,7 @@ def __upload_object( ('if_none_match',): { 'regex': { - 'pattern': r'\*', # noqa: E501 + 'pattern': r'^\*$', # noqa: E501 }, }, }, diff --git a/docs/assets/js/swagger.yml b/docs/assets/js/swagger.yml index ad3bf2cff70..447e25f6692 100644 --- a/docs/assets/js/swagger.yml +++ b/docs/assets/js/swagger.yml @@ -2391,12 +2391,12 @@ paths: type: string - in: header name: If-None-Match - description: Currently supports only "*" to allow uploading an object only if one doesn't exist yet. + description: Currently supports only "*" to allow uploading an object only if one doesn't exist yet example: "*" required: false schema: type: string - pattern: '\*' # Currently, only "*" is supported + pattern: '^\*$' # Currently, only "*" is supported responses: 201: description: object metadata diff --git a/pkg/api/controller.go b/pkg/api/controller.go index 353d25e829e..cc813674902 100644 --- a/pkg/api/controller.go +++ b/pkg/api/controller.go @@ -1692,7 +1692,7 @@ func (c *Controller) UploadObject(w http.ResponseWriter, r *http.Request, reposi allowOverwrite := true if params.IfNoneMatch != nil { if StringValue(params.IfNoneMatch) != "*" { - writeError(w, http.StatusBadRequest, fmt.Sprintf("Unsupported value for If-None-Match - Only \"*\" is supported")) + writeError(w, http.StatusBadRequest, "Unsupported value for If-None-Match - Only \"*\" is supported") return } // check if exists @@ -1736,11 +1736,8 @@ func (c *Controller) UploadObject(w http.ResponseWriter, r *http.Request, reposi Size: blob.Size, Checksum: blob.Checksum, } - if allowOverwrite { - err = c.Catalog.CreateEntry(ctx, repo.Name, branch, entry) - } else { - err = c.Catalog.CreateEntry(ctx, repo.Name, branch, entry, graveler.IfAbsent()) - } + + err = c.Catalog.CreateEntry(ctx, repo.Name, branch, entry, graveler.IfAbsent(!allowOverwrite)) if errors.Is(err, graveler.ErrPreconditionFailed) { writeError(w, http.StatusPreconditionFailed, "path already exists") return diff --git a/pkg/api/controller_test.go b/pkg/api/controller_test.go index 54a6ee35320..c2733d7f0f7 100644 --- a/pkg/api/controller_test.go +++ b/pkg/api/controller_test.go @@ -703,9 +703,10 @@ func TestController_UploadObject(t *testing.T) { clt, deps := setupClientWithAdmin(t, "") ctx := context.Background() + _, err := deps.catalog.CreateRepository(ctx, "my-new-repo", onBlock(deps, "foo1"), "main") + testutil.Must(t, err) + t.Run("upload object", func(t *testing.T) { - _, err := deps.catalog.CreateRepository(ctx, "my-new-repo", onBlock(deps, "foo1"), "main") - testutil.Must(t, err) // write contentType, buf := writeMultipart("content", "bar", "hello world!") b, err := clt.UploadObjectWithBodyWithResponse(ctx, "my-new-repo", "main", &api.UploadObjectParams{ @@ -722,27 +723,42 @@ func TestController_UploadObject(t *testing.T) { }) t.Run("overwrite", func(t *testing.T) { - // write - contentType, buf := writeMultipart("content", "bar", "hello world!") + // write first + contentType, buf := writeMultipart("content", "baz1", "hello world!") b, err := clt.UploadObjectWithBodyWithResponse(ctx, "my-new-repo", "main", &api.UploadObjectParams{ - Path: "foo/bar", + Path: "foo/baz1", }, contentType, buf) - testutil.Must(t, err) - if b.StatusCode() == 500 { - t.Fatalf("got 500 while uploading: %v", b.JSONDefault) + if b.StatusCode() != 201 { + t.Fatalf("expected 201 for UploadObject, got %d", b.StatusCode()) } + // overwrite + contentType, buf = writeMultipart("content", "baz1", "something else!") + b, err = clt.UploadObjectWithBodyWithResponse(ctx, "my-new-repo", "main", &api.UploadObjectParams{ + Path: "foo/baz1", + }, contentType, buf) + + testutil.Must(t, err) if b.StatusCode() != 201 { t.Fatalf("expected 201 for UploadObject, got %d", b.StatusCode()) } }) t.Run("disable overwrite with if-none-match (uncommitted entry)", func(t *testing.T) { - // write - contentType, buf := writeMultipart("content", "bar", "hello world!") - all := "*" + // write first + contentType, buf := writeMultipart("content", "baz2", "hello world!") b, err := clt.UploadObjectWithBodyWithResponse(ctx, "my-new-repo", "main", &api.UploadObjectParams{ - Path: "foo/bar", + Path: "foo/baz2", + }, contentType, buf) + testutil.Must(t, err) + if b.StatusCode() != 201 { + t.Fatalf("expected 201 for UploadObject, got %d", b.StatusCode()) + } + // overwrite + contentType, buf = writeMultipart("content", "baz2", "something else!") + all := "*" + b, err = clt.UploadObjectWithBodyWithResponse(ctx, "my-new-repo", "main", &api.UploadObjectParams{ + Path: "foo/baz2", IfNoneMatch: &all, }, contentType, buf) @@ -757,14 +773,11 @@ func TestController_UploadObject(t *testing.T) { testutil.Must(t, err) // write first - contentType, buf := writeMultipart("content", "baz", "hello world!") + contentType, buf := writeMultipart("content", "baz3", "hello world!") b, err := clt.UploadObjectWithBodyWithResponse(ctx, "my-new-repo", "another-branch", &api.UploadObjectParams{ - Path: "foo/baz", + Path: "foo/baz3", }, contentType, buf) testutil.Must(t, err) - if b.StatusCode() == 500 { - t.Fatalf("got 500 while uploading: %v", b.JSONDefault) - } if b.StatusCode() != 201 { t.Fatalf("expected 201 for UploadObject, got %d", b.StatusCode()) } @@ -775,9 +788,9 @@ func TestController_UploadObject(t *testing.T) { // overwrite after commit all := "*" - contentType, buf = writeMultipart("content", "baz", "something else!") + contentType, buf = writeMultipart("content", "baz3", "something else!") b, err = clt.UploadObjectWithBodyWithResponse(ctx, "my-new-repo", "another-branch", &api.UploadObjectParams{ - Path: "foo/baz", + Path: "foo/baz3", IfNoneMatch: &all, }, contentType, buf) diff --git a/pkg/catalog/catalog.go b/pkg/catalog/catalog.go index b086516ff92..79a6aa0e509 100644 --- a/pkg/catalog/catalog.go +++ b/pkg/catalog/catalog.go @@ -654,7 +654,7 @@ func addressTypeToCatalog(t Entry_AddressType) AddressType { } } -func (c *Catalog) CreateEntry(ctx context.Context, repository string, branch string, entry DBEntry, writeConditions ...graveler.WriteCondition) error { +func (c *Catalog) CreateEntry(ctx context.Context, repository string, branch string, entry DBEntry, writeConditions ...graveler.WriteConditionOption) error { repositoryID := graveler.RepositoryID(repository) branchID := graveler.BranchID(branch) ent := EntryFromCatalogEntry(entry) diff --git a/pkg/catalog/fake_graveler_test.go b/pkg/catalog/fake_graveler_test.go index 451e3a0c98c..bf5cca35f44 100644 --- a/pkg/catalog/fake_graveler_test.go +++ b/pkg/catalog/fake_graveler_test.go @@ -71,7 +71,7 @@ func (g *FakeGraveler) Get(_ context.Context, repositoryID graveler.RepositoryID return v, nil } -func (g *FakeGraveler) Set(_ context.Context, repositoryID graveler.RepositoryID, branchID graveler.BranchID, key graveler.Key, value graveler.Value, _ ...graveler.WriteCondition) error { +func (g *FakeGraveler) Set(_ context.Context, repositoryID graveler.RepositoryID, branchID graveler.BranchID, key graveler.Key, value graveler.Value, _ ...graveler.WriteConditionOption) error { if g.Err != nil { return g.Err } diff --git a/pkg/catalog/interface.go b/pkg/catalog/interface.go index ae8f115eb28..fd90a0453b8 100644 --- a/pkg/catalog/interface.go +++ b/pkg/catalog/interface.go @@ -82,7 +82,7 @@ type Interface interface { // GetEntry returns the current entry for path in repository branch reference. Returns // the entry with ExpiredError if it has expired from underlying storage. GetEntry(ctx context.Context, repository, reference string, path string, params GetEntryParams) (*DBEntry, error) - CreateEntry(ctx context.Context, repository, branch string, entry DBEntry, writeConditions ...graveler.WriteCondition) error + CreateEntry(ctx context.Context, repository, branch string, entry DBEntry, writeConditions ...graveler.WriteConditionOption) error CreateEntries(ctx context.Context, repository, branch string, entries []DBEntry) error DeleteEntry(ctx context.Context, repository, branch string, path string) error ListEntries(ctx context.Context, repository, reference string, prefix, after string, delimiter string, limit int) ([]*DBEntry, bool, error) diff --git a/pkg/graveler/graveler.go b/pkg/graveler/graveler.go index 3d2f0ec0913..0a0b60dcaee 100644 --- a/pkg/graveler/graveler.go +++ b/pkg/graveler/graveler.go @@ -56,11 +56,15 @@ type RangeInfo struct { } type WriteCondition struct { - ifAbsent bool + IfAbsent bool } -func IfAbsent() WriteCondition { - return WriteCondition{ifAbsent: true} +type WriteConditionOption func(condition *WriteCondition) + +func IfAbsent(v bool) WriteConditionOption { + return func(condition *WriteCondition) { + condition.IfAbsent = v + } } // function/methods receiving the following basic types could assume they passed validation @@ -253,7 +257,7 @@ type KeyValueStore interface { Get(ctx context.Context, repositoryID RepositoryID, ref Ref, key Key) (*Value, error) // Set stores value on repository / branch by key. nil value is a valid value for tombstone - Set(ctx context.Context, repositoryID RepositoryID, branchID BranchID, key Key, value Value, writeConditions ...WriteCondition) error + Set(ctx context.Context, repositoryID RepositoryID, branchID BranchID, key Key, value Value, writeConditions ...WriteConditionOption) error // Delete value from repository / branch branch by key Delete(ctx context.Context, repositoryID RepositoryID, branchID BranchID, key Key) error @@ -879,32 +883,32 @@ func (g *Graveler) Get(ctx context.Context, repositoryID RepositoryID, ref Ref, return g.CommittedManager.Get(ctx, repo.StorageNamespace, commit.MetaRangeID, key) } -func (g *Graveler) Set(ctx context.Context, repositoryID RepositoryID, branchID BranchID, key Key, value Value, writeConditions ...WriteCondition) error { +func (g *Graveler) Set(ctx context.Context, repositoryID RepositoryID, branchID BranchID, key Key, value Value, writeConditions ...WriteConditionOption) error { _, err := g.branchLocker.Writer(ctx, repositoryID, branchID, func() (interface{}, error) { branch, err := g.GetBranch(ctx, repositoryID, branchID) if err != nil { return nil, err } - allowOverwrite := true + writeCondition := &WriteCondition{} for _, cond := range writeConditions { - if cond.ifAbsent { - allowOverwrite = false - } + cond(writeCondition) } - if !allowOverwrite { - // ensure the given key doesn't exist in the underlying commit first + + if writeCondition.IfAbsent { + // Ensure the given key doesn't exist in the underlying commit first // Since we're being protected by the branch locker, we're guaranteed the commit // won't change before we finish the operation _, err := g.Get(ctx, repositoryID, Ref(branch.CommitID), key) if err == nil { // we got a key here already! return nil, ErrPreconditionFailed - } else if !errors.Is(err, ErrNotFound) { + } + if !errors.Is(err, ErrNotFound) { // another error occurred! return nil, err } } - err = g.StagingManager.Set(ctx, branch.StagingToken, key, &value, allowOverwrite) + err = g.StagingManager.Set(ctx, branch.StagingToken, key, &value, !writeCondition.IfAbsent) return nil, err }) return err diff --git a/pkg/graveler/staging/manager_test.go b/pkg/graveler/staging/manager_test.go index def92240b1f..0715dfa8b0e 100644 --- a/pkg/graveler/staging/manager_test.go +++ b/pkg/graveler/staging/manager_test.go @@ -32,6 +32,16 @@ func TestSetGet(t *testing.T) { if string(e.Identity) != "identity1" { t.Errorf("got wrong value. expected=%s, got=%s", "identity1", string(e.Identity)) } + + t.Run("test overwrites", func(t *testing.T) { + err = s.Set(ctx, "t2", []byte("a/b/c/d"), value, false) + testutil.Must(t, err) + + err = s.Set(ctx, "t2", []byte("a/b/c/d"), value, false) + if err != graveler.ErrPreconditionFailed { + t.Fatalf("expected a precondition error when overwriting") + } + }) } func TestMultiToken(t *testing.T) {