Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

lakefs new repositories with import branch as parent branch by default #810

Merged
merged 10 commits into from
Oct 15, 2020
80 changes: 1 addition & 79 deletions api/api_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ import (
"github.com/treeverse/lakefs/dedup"
"github.com/treeverse/lakefs/httputil"
"github.com/treeverse/lakefs/logging"
"github.com/treeverse/lakefs/onboard"
"github.com/treeverse/lakefs/permissions"
"github.com/treeverse/lakefs/retention"
"github.com/treeverse/lakefs/stats"
Expand Down Expand Up @@ -161,7 +160,6 @@ func (c *Controller) Configure(api *operations.LakefsAPI) {
api.RepositoriesGetRepositoryHandler = c.GetRepoHandler()
api.RepositoriesCreateRepositoryHandler = c.CreateRepositoryHandler()
api.RepositoriesDeleteRepositoryHandler = c.DeleteRepositoryHandler()
api.RepositoriesImportFromS3InventoryHandler = c.ImportFromS3InventoryHandler()

api.BranchesListBranchesHandler = c.ListBranchesHandler()
api.BranchesGetBranchHandler = c.GetBranchHandler()
Expand Down Expand Up @@ -545,7 +543,7 @@ func (c *Controller) CreateRepositoryHandler() repositories.CreateRepositoryHand
return repositories.NewCreateRepositoryBadRequest().
WithPayload(responseError("error creating repository: could not access storage namespace"))
}
err = deps.Cataloger.CreateRepository(c.Context(),
repo, err := deps.Cataloger.CreateRepository(c.Context(),
swag.StringValue(params.Repository.ID),
swag.StringValue(params.Repository.StorageNamespace),
params.Repository.DefaultBranch)
Expand All @@ -554,12 +552,6 @@ func (c *Controller) CreateRepositoryHandler() repositories.CreateRepositoryHand
WithPayload(responseError(fmt.Sprintf("error creating repository: %s", err)))
}

repo, err := deps.Cataloger.GetRepository(c.Context(), swag.StringValue(params.Repository.ID))
if err != nil {
return repositories.NewGetRepositoryDefault(http.StatusInternalServerError).
WithPayload(responseError(fmt.Sprintf("error creating repository: %s", err)))
}

return repositories.NewCreateRepositoryCreated().WithPayload(&models.Repository{
StorageNamespace: repo.StorageNamespace,
CreationDate: repo.CreationDate.Unix(),
Expand Down Expand Up @@ -2234,76 +2226,6 @@ func (c *Controller) RetentionUpdateRetentionPolicyHandler() retentionop.UpdateR
})
}

func (c *Controller) ImportFromS3InventoryHandler() repositories.ImportFromS3InventoryHandler {
return repositories.ImportFromS3InventoryHandlerFunc(func(params repositories.ImportFromS3InventoryParams, user *models.User) middleware.Responder {
deps, err := c.setupRequest(user, params.HTTPRequest, []permissions.Permission{
{
Action: permissions.CreateRepositoryAction,
Resource: permissions.RepoArn(params.Repository),
},
})
if err != nil {
return repositories.NewImportFromS3InventoryUnauthorized().WithPayload(responseErrorFrom(err))
}
deps.LogAction("import_from_s3_inventory")
userModel, err := c.deps.Auth.GetUser(user.ID)
username := "lakeFS"
if err == nil {
username = userModel.Username
}
importConfig := &onboard.Config{
CommitUsername: username,
InventoryURL: params.ManifestURL,
Repository: params.Repository,
InventoryGenerator: deps.BlockAdapter,
Cataloger: deps.Cataloger,
}
importer, err := onboard.CreateImporter(deps.ctx, deps.logger, importConfig)
if err != nil {
return repositories.NewImportFromS3InventoryDefault(http.StatusInternalServerError).
WithPayload(responseErrorFrom(err))
}
var importStats *onboard.Stats
dryRun := swag.BoolValue(params.DryRun)
if dryRun {
importStats, err = importer.Import(deps.ctx, true)
if err != nil {
return repositories.NewImportFromS3InventoryDefault(http.StatusInternalServerError).
WithPayload(responseErrorFrom(err))
}
} else {
repo, err := deps.Cataloger.GetRepository(c.Context(), params.Repository)
if err != nil {
return repositories.NewImportFromS3InventoryNotFound().
WithPayload(responseErrorFrom(err))
}
_, err = deps.Cataloger.GetBranchReference(deps.ctx, params.Repository, onboard.DefaultBranchName)
if errors.Is(err, db.ErrNotFound) {
_, err = deps.Cataloger.CreateBranch(deps.ctx, params.Repository, onboard.DefaultBranchName, repo.DefaultBranch)
if err != nil {
return repositories.NewImportFromS3InventoryDefault(http.StatusInternalServerError).
WithPayload(responseErrorFrom(err))
}
} else if err != nil {
return repositories.NewImportFromS3InventoryDefault(http.StatusInternalServerError).
WithPayload(responseErrorFrom(err))
}
importStats, err = importer.Import(params.HTTPRequest.Context(), false)
if err != nil {
return repositories.NewImportFromS3InventoryDefault(http.StatusInternalServerError).
WithPayload(responseErrorFrom(err))
}
}
return repositories.NewImportFromS3InventoryCreated().WithPayload(&repositories.ImportFromS3InventoryCreatedBody{
IsDryRun: dryRun,
PreviousImportDate: importStats.PreviousImportDate.Unix(),
PreviousManifest: importStats.PreviousInventoryURL,
AddedOrChanged: int64(importStats.AddedOrChanged),
Deleted: int64(importStats.Deleted),
})
})
}

func (c *Controller) ConfigGetConfigHandler() configop.GetConfigHandler {
return configop.GetConfigHandlerFunc(func(params configop.GetConfigParams, user *models.User) middleware.Responder {
deps, err := c.setupRequest(user, params.HTTPRequest, []permissions.Permission{
Expand Down
86 changes: 51 additions & 35 deletions api/api_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,12 @@ func TestHandler_ListRepositoriesHandler(t *testing.T) {
t.Run("list some repos", func(t *testing.T) {
// write some repos
ctx := context.Background()
testutil.Must(t, deps.cataloger.CreateRepository(ctx, "foo1", "s3://foo1", "master"))
testutil.Must(t, deps.cataloger.CreateRepository(ctx, "foo2", "s3://foo1", "master"))
testutil.Must(t, deps.cataloger.CreateRepository(ctx, "foo3", "s3://foo1", "master"))
_, err := deps.cataloger.CreateRepository(ctx, "foo1", "s3://foo1", "master")
testutil.Must(t, err)
_, err = deps.cataloger.CreateRepository(ctx, "foo2", "s3://foo1", "master")
testutil.Must(t, err)
_, err = deps.cataloger.CreateRepository(ctx, "foo3", "s3://foo1", "master")
testutil.Must(t, err)

resp, err := clt.Repositories.ListRepositories(&repositories.ListRepositoriesParams{},
httptransport.BasicAuth(creds.AccessKeyID, creds.AccessSecretKey))
Expand Down Expand Up @@ -158,8 +161,9 @@ func TestHandler_GetRepoHandler(t *testing.T) {

t.Run("get existing repo", func(t *testing.T) {
const testBranchName = "non-default"
testutil.Must(t,
deps.cataloger.CreateRepository(context.Background(), "foo1", "s3://foo1", testBranchName))
_, err := deps.cataloger.CreateRepository(context.Background(), "foo1", "s3://foo1", testBranchName)
testutil.Must(t, err)

resp, err := clt.Repositories.GetRepository(&repositories.GetRepositoryParams{
Repository: "foo1",
}, httptransport.BasicAuth(creds.AccessKeyID, creds.AccessSecretKey))
Expand Down Expand Up @@ -188,7 +192,7 @@ func TestHandler_CommitsGetBranchCommitLogHandler(t *testing.T) {

ctx := context.Background()
t.Run("get missing branch", func(t *testing.T) {
err := deps.cataloger.CreateRepository(ctx, "repo1", "ns1", "master")
_, err := deps.cataloger.CreateRepository(ctx, "repo1", "ns1", "master")
if err != nil {
t.Fatal(err)
}
Expand All @@ -202,7 +206,7 @@ func TestHandler_CommitsGetBranchCommitLogHandler(t *testing.T) {
})

t.Run("get branch log", func(t *testing.T) {
err := deps.cataloger.CreateRepository(ctx, "repo2", "ns1", "master")
_, err := deps.cataloger.CreateRepository(ctx, "repo2", "ns1", "master")
if err != nil {
t.Fatal(err)
}
Expand All @@ -225,7 +229,7 @@ func TestHandler_CommitsGetBranchCommitLogHandler(t *testing.T) {
if err != nil {
t.Fatalf("unexpected error getting log of commits: %s", err)
}
const expectedCommits = commitsLen + 1 // one for the branch creation
const expectedCommits = commitsLen + 2 // one for the branch creation + import branch
commitsLog := resp.GetPayload().Results
if len(commitsLog) != expectedCommits {
t.Fatalf("Log %d commits, expected %d", len(commitsLog), expectedCommits)
Expand Down Expand Up @@ -260,7 +264,7 @@ func TestHandler_GetCommitHandler(t *testing.T) {

t.Run("get existing commit", func(t *testing.T) {
ctx := context.Background()
err := deps.cataloger.CreateRepository(ctx, "foo1", "s3://foo1", "master")
_, err := deps.cataloger.CreateRepository(ctx, "foo1", "s3://foo1", "master")
testutil.Must(t, err)
testutil.MustDo(t, "create entry bar1", deps.cataloger.CreateEntry(ctx, "foo1", "master",
catalog.Entry{Path: "foo/bar1", PhysicalAddress: "bar1addr", CreationDate: time.Now(), Size: 1, Checksum: "cksum1"},
Expand Down Expand Up @@ -322,13 +326,13 @@ func TestHandler_CommitHandler(t *testing.T) {

t.Run("commit success", func(t *testing.T) {
ctx := context.Background()
testutil.MustDo(t, "create repo foo1",
deps.cataloger.CreateRepository(ctx, "foo1", "s3://foo1", "master"))
_, err := deps.cataloger.CreateRepository(ctx, "foo1", "s3://foo1", "master")
testutil.MustDo(t, "create repo foo1", err)
testutil.MustDo(t, "commit bar on foo1", deps.cataloger.CreateEntry(ctx, "foo1", "master",
catalog.Entry{Path: "foo/bar", PhysicalAddress: "pa", CreationDate: time.Now(), Size: 666, Checksum: "cs", Metadata: nil},
catalog.CreateEntryParams{},
))
_, err := clt.Commits.Commit(&commits.CommitParams{
_, err = clt.Commits.Commit(&commits.CommitParams{
Branch: "master",
Commit: &models.CommitCreation{
Message: swag.String("some message"),
Expand Down Expand Up @@ -373,7 +377,7 @@ func TestHandler_CreateRepositoryHandler(t *testing.T) {

t.Run("create repo duplicate", func(t *testing.T) {
ctx := context.Background()
err := deps.cataloger.CreateRepository(ctx, "repo2", "s3://foo1/", "master")
_, err := deps.cataloger.CreateRepository(ctx, "repo2", "s3://foo1/", "master")
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -404,9 +408,10 @@ func TestHandler_DeleteRepositoryHandler(t *testing.T) {

ctx := context.Background()
t.Run("delete repo success", func(t *testing.T) {
testutil.Must(t, deps.cataloger.CreateRepository(ctx, "my-new-repo", "s3://foo1/", "master"))
_, err := deps.cataloger.CreateRepository(ctx, "my-new-repo", "s3://foo1/", "master")
testutil.Must(t, err)

_, err := clt.Repositories.DeleteRepository(&repositories.DeleteRepositoryParams{
_, err = clt.Repositories.DeleteRepository(&repositories.DeleteRepositoryParams{
Repository: "my-new-repo",
}, bauth)

Expand All @@ -431,11 +436,15 @@ func TestHandler_DeleteRepositoryHandler(t *testing.T) {
})

t.Run("delete repo doesnt delete other repos", func(t *testing.T) {
testutil.Must(t, deps.cataloger.CreateRepository(ctx, "rr0", "s3://foo1", "master"))
testutil.Must(t, deps.cataloger.CreateRepository(ctx, "rr1", "s3://foo1", "master"))
testutil.Must(t, deps.cataloger.CreateRepository(ctx, "rr11", "s3://foo1", "master"))
testutil.Must(t, deps.cataloger.CreateRepository(ctx, "rr2", "s3://foo1", "master"))
_, err := clt.Repositories.DeleteRepository(&repositories.DeleteRepositoryParams{
_, err := deps.cataloger.CreateRepository(ctx, "rr0", "s3://foo1", "master")
testutil.Must(t, err)
_, err = deps.cataloger.CreateRepository(ctx, "rr1", "s3://foo1", "master")
testutil.Must(t, err)
_, err = deps.cataloger.CreateRepository(ctx, "rr11", "s3://foo1", "master")
testutil.Must(t, err)
_, err = deps.cataloger.CreateRepository(ctx, "rr2", "s3://foo1", "master")
testutil.Must(t, err)
_, err = clt.Repositories.DeleteRepository(&repositories.DeleteRepositoryParams{
Repository: "rr1",
}, bauth)

Expand Down Expand Up @@ -473,22 +482,26 @@ func TestHandler_ListBranchesHandler(t *testing.T) {

t.Run("list branches only default", func(t *testing.T) {
ctx := context.Background()
testutil.Must(t, deps.cataloger.CreateRepository(ctx, "repo1", "s3://foo1", "master"))
_, err := deps.cataloger.CreateRepository(ctx, "repo1", "s3://foo1", "master")
testutil.Must(t, err)
resp, err := clt.Branches.ListBranches(&branches.ListBranchesParams{
Amount: swag.Int64(-1),
Repository: "repo1",
}, bauth)
if err != nil {
t.Fatalf("unexpected error listing branches: %s", err)
}
if len(resp.GetPayload().Results) != 1 {
t.Fatalf("expected 1 branch, got %d", len(resp.GetPayload().Results))
const expectedBranchesLen = 2 // branch creation and import branch
branchesLen := len(resp.GetPayload().Results)
if branchesLen != expectedBranchesLen {
t.Fatalf("ListBranches len=%d, expected %d", branchesLen, expectedBranchesLen)
}
})

t.Run("list branches pagination", func(t *testing.T) {
ctx := context.Background()
testutil.Must(t, deps.cataloger.CreateRepository(ctx, "repo2", "s3://foo2", "master"))
_, err := deps.cataloger.CreateRepository(ctx, "repo2", "s3://foo2", "master")
testutil.Must(t, err)
for i := 0; i < 7; i++ {
branchName := "master" + strconv.Itoa(i+1)
_, err := deps.cataloger.CreateBranch(ctx, "repo2", branchName, "master")
Expand Down Expand Up @@ -550,7 +563,8 @@ func TestHandler_GetBranchHandler(t *testing.T) {
t.Run("get default branch", func(t *testing.T) {
ctx := context.Background()
const testBranch = "master"
testutil.Must(t, deps.cataloger.CreateRepository(ctx, "repo1", "s3://foo1", testBranch))
_, err := deps.cataloger.CreateRepository(ctx, "repo1", "s3://foo1", testBranch)
testutil.Must(t, err)
resp, err := clt.Branches.GetBranch(&branches.GetBranchParams{
Branch: testBranch,
Repository: "repo1",
Expand Down Expand Up @@ -598,7 +612,8 @@ func TestHandler_CreateBranchHandler(t *testing.T) {

t.Run("create branch success", func(t *testing.T) {
ctx := context.Background()
testutil.Must(t, deps.cataloger.CreateRepository(ctx, "repo1", "s3://foo1", "master"))
_, err := deps.cataloger.CreateRepository(ctx, "repo1", "s3://foo1", "master")
testutil.Must(t, err)
const newBranchName = "master2"
resp, err := clt.Branches.CreateBranch(&branches.CreateBranchParams{
Branch: &models.BranchCreation{
Expand Down Expand Up @@ -656,8 +671,9 @@ func TestHandler_DeleteBranchHandler(t *testing.T) {

t.Run("delete branch success", func(t *testing.T) {
ctx := context.Background()
testutil.Must(t, deps.cataloger.CreateRepository(ctx, "my-new-repo", "s3://foo1", "master"))
_, err := deps.cataloger.CreateBranch(ctx, "my-new-repo", "master2", "master")
_, err := deps.cataloger.CreateRepository(ctx, "my-new-repo", "s3://foo1", "master")
testutil.Must(t, err)
_, err = deps.cataloger.CreateBranch(ctx, "my-new-repo", "master2", "master")
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -701,7 +717,7 @@ func TestHandler_ObjectsStatObjectHandler(t *testing.T) {
clt.SetTransport(&handlerTransport{Handler: handler})

ctx := context.Background()
err := deps.cataloger.CreateRepository(ctx, "repo1", "ns1", "master")
_, err := deps.cataloger.CreateRepository(ctx, "repo1", "ns1", "master")
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -790,8 +806,8 @@ func TestHandler_ObjectsListObjectsHandler(t *testing.T) {
clt := client.Default
clt.SetTransport(&handlerTransport{Handler: handler})
ctx := context.Background()
testutil.Must(t,
deps.cataloger.CreateRepository(ctx, "repo1", "ns1", "master"))
_, err := deps.cataloger.CreateRepository(ctx, "repo1", "ns1", "master")
testutil.Must(t, err)
testutil.Must(t,
deps.cataloger.CreateEntry(ctx, "repo1", "master", catalog.Entry{
Path: "foo/bar",
Expand Down Expand Up @@ -888,7 +904,7 @@ func TestHandler_ObjectsGetObjectHandler(t *testing.T) {
// setup client
clt := client.Default
clt.SetTransport(&handlerTransport{Handler: handler})
err := deps.cataloger.CreateRepository(ctx, "repo1", "ns1", "master")
_, err := deps.cataloger.CreateRepository(ctx, "repo1", "ns1", "master")
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -995,7 +1011,7 @@ func TestHandler_ObjectsUploadObjectHandler(t *testing.T) {
clt := client.Default
clt.SetTransport(&handlerTransport{Handler: handler})
ctx := context.Background()
err := deps.cataloger.CreateRepository(ctx, "repo1", "ns1", "master")
_, err := deps.cataloger.CreateRepository(ctx, "repo1", "ns1", "master")
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -1095,7 +1111,7 @@ func TestHandler_ObjectsDeleteObjectHandler(t *testing.T) {
clt := client.Default
clt.SetTransport(&handlerTransport{Handler: handler})
ctx := context.Background()
err := deps.cataloger.CreateRepository(ctx, "repo1", "ns1", "master")
_, err := deps.cataloger.CreateRepository(ctx, "repo1", "ns1", "master")
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -1265,7 +1281,7 @@ func TestHandler_RetentionPolicyHandlers(t *testing.T) {
clt := client.Default
clt.SetTransport(&handlerTransport{Handler: handler})
ctx := context.Background()
err := deps.cataloger.CreateRepository(ctx, "repo1", "ns1", "master")
_, err := deps.cataloger.CreateRepository(ctx, "repo1", "ns1", "master")
if err != nil {
t.Fatal(err)
}
Expand Down
9 changes: 5 additions & 4 deletions catalog/cataloger.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,10 @@ import (
)

const (
CatalogerCommitter = ""

DefaultPathDelimiter = "/"
CatalogerCommitter = ""
DefaultBranchName = "master"
DefaultImportBranchName = "import-from-inventory"
DefaultPathDelimiter = "/"

dedupBatchSize = 10
dedupBatchTimeout = 50 * time.Millisecond
Expand Down Expand Up @@ -58,7 +59,7 @@ type ExpireResult struct {
}

type RepositoryCataloger interface {
CreateRepository(ctx context.Context, repository string, storageNamespace string, branch string) error
CreateRepository(ctx context.Context, repository string, storageNamespace string, branch string) (*Repository, error)
GetRepository(ctx context.Context, repository string) (*Repository, error)
DeleteRepository(ctx context.Context, repository string) error
ListRepositories(ctx context.Context, limit int, after string) ([]*Repository, bool, error)
Expand Down
4 changes: 2 additions & 2 deletions catalog/cataloger_commit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,12 @@ func TestCataloger_Commit(t *testing.T) {
name: "simple",
args: args{repository: repository, branch: "master", message: "Simple commit", committer: "tester", metadata: meta},
want: &CommitLog{
Reference: "~KJ8Wd1Rs96Z",
Reference: "~KJ8Wd1Rs96a",
Committer: "tester",
Message: "Simple commit",
CreationDate: time.Now(),
Metadata: meta,
Parents: []string{"~KJ8Wd1Rs96Y"},
Parents: []string{"~KJ8Wd1Rs96Z"},
},
wantErr: false,
},
Expand Down
Loading