Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Validate new repo isn't using existing storage namespace #3104

Merged
merged 6 commits into from
Mar 27, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 4 additions & 7 deletions .github/workflows/nessie.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -325,8 +325,8 @@ jobs:

- name: Test lakeFS S3 with Spark 2.x
env:
STORAGE_NAMESPACE: s3://nessie-system-testing/${{ github.run_number }}-spark
REPOSITORY: gateway-test
STORAGE_NAMESPACE: s3://nessie-system-testing/${{ github.run_number }}-spark2
REPOSITORY: gateway-test-spark2
SONNET_JAR: sonnets-246/target/sonnets-246/scala-2.11/sonnets-246_2.11-0.1.0.jar
working-directory: test/spark
run: ./run-test.sh
Expand Down Expand Up @@ -396,8 +396,8 @@ jobs:
- name: Test lakeFS S3 with Spark 3.x
continue-on-error: true
env:
STORAGE_NAMESPACE: s3://nessie-system-testing/${{ github.run_number }}-spark
REPOSITORY: gateway-test
STORAGE_NAMESPACE: s3://nessie-system-testing/${{ github.run_number }}-spark3
REPOSITORY: gateway-test-spark3
SONNET_JAR: sonnets-311/target/sonnets-311/scala-2.12/sonnets-311_2.12-0.1.0.jar
working-directory: test/spark
run: ./run-test.sh
Expand Down Expand Up @@ -455,9 +455,6 @@ jobs:
LAKEFS_BLOCKSTORE_S3_CREDENTIALS_ACCESS_KEY_ID: ${{ secrets.NESSIE_AWS_ACCESS_KEY_ID }}
LAKEFS_BLOCKSTORE_S3_CREDENTIALS_SECRET_ACCESS_KEY: ${{ secrets.NESSIE_AWS_SECRET_ACCESS_KEY }}

- name: Copy repository ref
itaiad200 marked this conversation as resolved.
Show resolved Hide resolved
run: aws s3 cp --recursive s3://nessie-system-testing-data/golden-files/gc-test-data s3://nessie-system-testing/${{ github.run_number }}-storage-rclone-export

- name: Setup lakeFS for tests
env:
STORAGE_NAMESPACE: s3://nessie-system-testing/${{ github.run_number }}-storage-rclone-export
Expand Down
3 changes: 2 additions & 1 deletion nessie/lakectl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ func TestLakectlBasicRepoActions(t *testing.T) {
// }

// Trying to create the same repo again fails and does not change the list
RunCmdAndVerifyFailureWithFile(t, Lakectl()+" repo create lakefs://"+repoName+" "+storage, false, "lakectl_repo_create_not_unique", vars)
newStorage := storage + "/new-storage/"
RunCmdAndVerifyFailureWithFile(t, Lakectl()+" repo create lakefs://"+repoName+" "+newStorage, false, "lakectl_repo_create_not_unique", vars)

// Fails due to the usage of repos for isolation - nessie creates repos in parallel and
// the output of 'repo list' command cannot be well defined
Expand Down
28 changes: 22 additions & 6 deletions pkg/api/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -1203,21 +1203,28 @@ func (c *Controller) CreateRepository(w http.ResponseWriter, r *http.Request, bo
return
}

err := ensureStorageNamespace(ctx, c.BlockAdapter, body.StorageNamespace)
err := c.ensureStorageNamespace(ctx, body.StorageNamespace)
if err != nil {
reason := "unknown"
var retErr error
var urlErr *url.Error
switch {
case errors.As(err, &urlErr) && urlErr.Op == "parse":
retErr = err
reason = "bad_url"
case errors.Is(err, block.ErrInvalidNamespace):
retErr = fmt.Errorf("%w, must match: %s", err, c.BlockAdapter.BlockstoreType())
reason = "invalid_namespace"
case errors.Is(err, errStorageNamespaceInUse):
retErr = err
reason = "already_in_use"
default:
retErr = ErrFailedToAccessStorage
}
c.Logger.
WithError(err).
WithField("storage_namespace", body.StorageNamespace).
WithField("reason", reason).
Warn("Could not access storage namespace")
writeError(w, http.StatusBadRequest, fmt.Errorf("failed to create repository: %w", retErr))
return
Expand All @@ -1238,19 +1245,28 @@ func (c *Controller) CreateRepository(w http.ResponseWriter, r *http.Request, bo
writeResponse(w, http.StatusCreated, response)
}

func ensureStorageNamespace(ctx context.Context, adapter block.Adapter, storageNamespace string) error {
var errStorageNamespaceInUse = errors.New("lakeFS repositories can't share storage namespace")

func (c *Controller) ensureStorageNamespace(ctx context.Context, storageNamespace string) error {
const (
dummyKey = "dummy"
dummyData = "this is dummy data - created by lakeFS in order to check accessibility"
)

obj := block.ObjectPointer{StorageNamespace: storageNamespace, Identifier: dummyKey}
objLen := int64(len(dummyData))
err := adapter.Put(ctx, obj, objLen, strings.NewReader(dummyData), block.PutOpts{})
if err != nil {
if _, err := c.BlockAdapter.Get(ctx, obj, objLen); err == nil {
return fmt.Errorf("found lakeFS objects in the storage namespace(%s): %w",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
return fmt.Errorf("found lakeFS objects in the storage namespace(%s): %w",
return fmt.Errorf("%s: %w",

(and maybe rephrase the error itself if that doesn't look nice enough).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would rather keep it as is. The first sentence makes it clearer IMO.

storageNamespace, errStorageNamespaceInUse)
} else if !errors.Is(err, adapter.ErrDataNotFound) {
return err
}
_, err = adapter.Get(ctx, obj, objLen)

if err := c.BlockAdapter.Put(ctx, obj, objLen, strings.NewReader(dummyData), block.PutOpts{}); err != nil {
return err
}

_, err := c.BlockAdapter.Get(ctx, obj, objLen)
return err
}

Expand Down Expand Up @@ -2291,7 +2307,7 @@ func (c *Controller) DumpRefs(w http.ResponseWriter, r *http.Request, repository
}
err = c.BlockAdapter.Put(ctx, block.ObjectPointer{
StorageNamespace: repo.StorageNamespace,
Identifier: "_lakefs/refs_manifest.json",
Identifier: fmt.Sprintf("%s/refs_manifest.json", c.Config.GetCommittedBlockStoragePrefix()),
}, int64(len(manifestBytes)), bytes.NewReader(manifestBytes), block.PutOpts{})
if err != nil {
writeError(w, http.StatusInternalServerError, err)
Expand Down
21 changes: 19 additions & 2 deletions pkg/api/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

"github.com/go-test/deep"
nanoid "github.com/matoous/go-nanoid/v2"
"github.com/stretchr/testify/require"
"github.com/treeverse/lakefs/pkg/api"
"github.com/treeverse/lakefs/pkg/block"
"github.com/treeverse/lakefs/pkg/catalog"
Expand Down Expand Up @@ -184,6 +185,22 @@ func TestController_GetRepoHandler(t *testing.T) {
t.Fatalf("unexpected branch name %s, expected %s", repository.DefaultBranch, testBranchName)
}
})

t.Run("use same storage namespace twice", func(t *testing.T) {
name := testUniqueRepoName()
resp, err := clt.CreateRepositoryWithResponse(ctx, &api.CreateRepositoryParams{}, api.CreateRepositoryJSONRequestBody{
Name: name,
StorageNamespace: onBlock(deps, name),
})
verifyResponseOK(t, resp, err)

resp, err = clt.CreateRepositoryWithResponse(ctx, &api.CreateRepositoryParams{}, api.CreateRepositoryJSONRequestBody{
Name: name + "_2",
StorageNamespace: onBlock(deps, name),
})
require.NoError(t, err)
require.Equal(t, http.StatusBadRequest, resp.StatusCode())
})
}

func testCommitEntries(t *testing.T, ctx context.Context, cat catalog.Interface, deps *dependencies, params commitEntriesParams) string {
Expand Down Expand Up @@ -522,7 +539,7 @@ func TestController_CreateRepositoryHandler(t *testing.T) {
resp, err := clt.CreateRepositoryWithResponse(ctx, &api.CreateRepositoryParams{}, api.CreateRepositoryJSONRequestBody{
DefaultBranch: api.StringPtr("main"),
Name: "my-new-repo",
StorageNamespace: onBlock(deps, "foo-bucket"),
StorageNamespace: onBlock(deps, "foo-bucket-1"),
})
verifyResponseOK(t, resp, err)

Expand All @@ -540,7 +557,7 @@ func TestController_CreateRepositoryHandler(t *testing.T) {
resp, err := clt.CreateRepositoryWithResponse(ctx, &api.CreateRepositoryParams{}, api.CreateRepositoryJSONRequestBody{
DefaultBranch: api.StringPtr("main"),
Name: "repo2",
StorageNamespace: onBlock(deps, "foo-bucket"),
StorageNamespace: onBlock(deps, "foo-bucket-2"),
})
if err != nil {
t.Fatal(err)
Expand Down
9 changes: 6 additions & 3 deletions test/spark/docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ services:
extra_hosts:
- "s3.docker.lakefs.io:10.5.0.55"
- "example.s3.docker.lakefs.io:10.5.0.55"
- "gateway-test.s3.docker.lakefs.io:10.5.0.55"
- "gateway-test-spark2.s3.docker.lakefs.io:10.5.0.55"
- "gateway-test-spark3.s3.docker.lakefs.io:10.5.0.55"
- "thick-client-test.s3.docker.lakefs.io:10.5.0.55"
spark-worker:
image: docker.io/bitnami/spark:${SPARK_TAG:-3}
Expand All @@ -81,7 +82,8 @@ services:
- "s3.docker.lakefs.io:10.5.0.55"
- "docker.lakefs.io:10.5.0.55"
- "example.s3.docker.lakefs.io:10.5.0.55"
- "gateway-test.s3.docker.lakefs.io:10.5.0.55"
- "gateway-test-spark2.s3.docker.lakefs.io:10.5.0.55"
- "gateway-test-spark3.s3.docker.lakefs.io:10.5.0.55"
- "thick-client-test.s3.docker.lakefs.io:10.5.0.55"
spark-submit:
image: docker.io/bitnami/spark:${SPARK_TAG:-3}
Expand All @@ -106,7 +108,8 @@ services:
- "s3.docker.lakefs.io:10.5.0.55"
- "docker.lakefs.io:10.5.0.55"
- "example.s3.docker.lakefs.io:10.5.0.55"
- "gateway-test.s3.docker.lakefs.io:10.5.0.55"
- "gateway-test-spark2.s3.docker.lakefs.io:10.5.0.55"
- "gateway-test-spark3.s3.docker.lakefs.io:10.5.0.55"
- "thick-client-test.s3.docker.lakefs.io:10.5.0.55"

networks:
Expand Down