Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add management api for continuous export of branches #844

Merged
merged 6 commits into from
Oct 22, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 74 additions & 0 deletions api/api_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,15 @@ import (
"github.com/aws/aws-sdk-go/aws"
"github.com/go-openapi/runtime"
"github.com/go-openapi/runtime/middleware"
"github.com/go-openapi/strfmt"
"github.com/go-openapi/swag"
"github.com/treeverse/lakefs/api/gen/models"
"github.com/treeverse/lakefs/api/gen/restapi/operations"
authop "github.com/treeverse/lakefs/api/gen/restapi/operations/auth"
"github.com/treeverse/lakefs/api/gen/restapi/operations/branches"
"github.com/treeverse/lakefs/api/gen/restapi/operations/commits"
configop "github.com/treeverse/lakefs/api/gen/restapi/operations/config"
exportop "github.com/treeverse/lakefs/api/gen/restapi/operations/export"
hcop "github.com/treeverse/lakefs/api/gen/restapi/operations/health_check"
metadataop "github.com/treeverse/lakefs/api/gen/restapi/operations/metadata"
"github.com/treeverse/lakefs/api/gen/restapi/operations/objects"
Expand Down Expand Up @@ -184,8 +186,12 @@ func (c *Controller) Configure(api *operations.LakefsAPI) {

api.RetentionGetRetentionPolicyHandler = c.RetentionGetRetentionPolicyHandler()
api.RetentionUpdateRetentionPolicyHandler = c.RetentionUpdateRetentionPolicyHandler()

api.MetadataCreateSymlinkHandler = c.MetadataCreateSymlinkHandler()

api.ExportGetContinuousExportHandler = c.ExportGetContinuousExportHandler()
api.ExportSetContinuousExportHandler = c.ExportSetContinuousExportHandler()

api.ConfigGetConfigHandler = c.ConfigGetConfigHandler()
}

Expand Down Expand Up @@ -2179,6 +2185,74 @@ func (c *Controller) DetachPolicyFromGroupHandler() authop.DetachPolicyFromGroup
})
}

func (c *Controller) ExportGetContinuousExportHandler() exportop.GetContinuousExportHandler {
return exportop.GetContinuousExportHandlerFunc(func(params exportop.GetContinuousExportParams, user *models.User) middleware.Responder {
deps, err := c.setupRequest(user, params.HTTPRequest, []permissions.Permission{
{
Action: permissions.ListBranchesAction,
Resource: permissions.BranchArn(params.Repository, params.Branch),
},
})
if err != nil {
return exportop.NewGetContinuousExportUnauthorized().
WithPayload(responseErrorFrom(err))
}

deps.LogAction("get_continuous_export")

config, err := deps.Cataloger.GetExportConfigurationForBranch(params.Repository, params.Branch)
if errors.Is(err, catalog.ErrRepositoryNotFound) || errors.Is(err, catalog.ErrBranchNotFound) {
return exportop.NewGetContinuousExportNotFound().
WithPayload(responseErrorFrom(err))
}
if err != nil {
return exportop.NewGetContinuousExportDefault(http.StatusInternalServerError).
WithPayload(responseErrorFrom(err))
}

payload := models.ContinuousExportConfiguration{
ExportPath: strfmt.URI(config.Path),
ExportStatusPath: strfmt.URI(config.StatusPath),
LastKeysInPrefixRegexp: config.LastKeysInPrefixRegexp,
}
return exportop.NewGetContinuousExportOK().WithPayload(&payload)
})
}

func (c *Controller) ExportSetContinuousExportHandler() exportop.SetContinuousExportHandlerFunc {
return exportop.SetContinuousExportHandlerFunc(func(params exportop.SetContinuousExportParams, user *models.User) middleware.Responder {
deps, err := c.setupRequest(user, params.HTTPRequest, []permissions.Permission{
{
Action: permissions.CreateBranchAction,
Resource: permissions.BranchArn(params.Repository, params.Branch),
},
})
if err != nil {
return exportop.NewSetContinuousExportUnauthorized().
WithPayload(responseErrorFrom(err))
}

deps.LogAction("set_continuous_export")

config := catalog.ExportConfiguration{
Path: params.Config.ExportPath.String(),
StatusPath: params.Config.ExportStatusPath.String(),
LastKeysInPrefixRegexp: params.Config.LastKeysInPrefixRegexp,
}
err = deps.Cataloger.PutExportConfiguration(params.Repository, params.Branch, &config)
if errors.Is(err, catalog.ErrRepositoryNotFound) || errors.Is(err, catalog.ErrBranchNotFound) {
return exportop.NewSetContinuousExportNotFound().
WithPayload(responseErrorFrom(err))
}
if err != nil {
return exportop.NewSetContinuousExportDefault(http.StatusInternalServerError).
WithPayload(responseErrorFrom(err))
}

return exportop.NewSetContinuousExportCreated()
})
}

func (c *Controller) RetentionGetRetentionPolicyHandler() retentionop.GetRetentionPolicyHandler {
return retentionop.GetRetentionPolicyHandlerFunc(func(params retentionop.GetRetentionPolicyParams, user *models.User) middleware.Responder {
deps, err := c.setupRequest(user, params.HTTPRequest, []permissions.Permission{
Expand Down
89 changes: 89 additions & 0 deletions api/api_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,15 @@ import (

"github.com/go-openapi/runtime"
httptransport "github.com/go-openapi/runtime/client"
"github.com/go-openapi/strfmt"
"github.com/go-openapi/swag"
"github.com/go-test/deep"
"github.com/treeverse/lakefs/api/gen/client"
"github.com/treeverse/lakefs/api/gen/client/auth"
"github.com/treeverse/lakefs/api/gen/client/branches"
"github.com/treeverse/lakefs/api/gen/client/commits"
"github.com/treeverse/lakefs/api/gen/client/config"
"github.com/treeverse/lakefs/api/gen/client/export"
"github.com/treeverse/lakefs/api/gen/client/objects"
"github.com/treeverse/lakefs/api/gen/client/repositories"
"github.com/treeverse/lakefs/api/gen/client/retention"
Expand Down Expand Up @@ -1354,6 +1356,93 @@ func TestHandler_ConfigHandlers(t *testing.T) {
})
}

func TestHandler_ContinuousExportHandlers(t *testing.T) {
const (
repo = "repo-for-continuous-export-test"
branch = "main"
anotherBranch = "notMain"
)
handler, deps := getHandler(t, "")

creds := createDefaultAdminUser(deps.auth, t)
bauth := httptransport.BasicAuth(creds.AccessKeyID, creds.AccessSecretKey)

clt := client.Default
clt.SetTransport(&handlerTransport{Handler: handler})

ctx := context.Background()
_, err := deps.cataloger.CreateRepository(ctx, repo, "s3://foo1", branch)
testutil.MustDo(t, "create repository", err)

config := models.ContinuousExportConfiguration{
ExportPath: strfmt.URI("s3://bucket/export"),
ExportStatusPath: strfmt.URI("s3://bucket/report"),
LastKeysInPrefixRegexp: []string{"^_success$", ".*/_success$"},
}

res, err := clt.Export.SetContinuousExport(&export.SetContinuousExportParams{
Repository: repo,
Branch: branch,
Config: &config,
}, bauth)
testutil.MustDo(t, "initial continuous export configuration", err)
if res == nil {
t.Fatalf("initial continuous export configuration: expected OK but got nil")
}

t.Run("get missing branch configuration", func(t *testing.T) {
res, err := clt.Export.GetContinuousExport(&export.GetContinuousExportParams{
Repository: repo,
Branch: anotherBranch,
}, bauth)
if err == nil || res != nil {
t.Fatalf("expected get to return an error but got result %v, error nil", res)
}
if _, ok := err.(*export.GetContinuousExportNotFound); !ok {
t.Errorf("expected get to return not found but got %T %+v", err, err)
}
})

t.Run("get configured branch", func(t *testing.T) {
got, err := clt.Export.GetContinuousExport(&export.GetContinuousExportParams{
Repository: repo,
Branch: branch,
}, bauth)
if err != nil {
t.Fatalf("expected get to return result but got %s", err)
}
if diffs := deep.Equal(config, *got.GetPayload()); diffs != nil {
t.Errorf("got different configuration: %s", diffs)
}
})

t.Run("overwrite configuration", func(t *testing.T) {
newConfig := models.ContinuousExportConfiguration{
ExportPath: strfmt.URI("s3://better-bucket/export"),
ExportStatusPath: strfmt.URI("s3://better-bucket/report"),
LastKeysInPrefixRegexp: nil,
}
_, err := clt.Export.SetContinuousExport(&export.SetContinuousExportParams{
Repository: repo,
Branch: branch,
Config: &newConfig,
}, bauth)
if err != nil {
t.Errorf("failed to overwrite continuous export configuration: %s", err)
}
got, err := clt.Export.GetContinuousExport(&export.GetContinuousExportParams{
Repository: repo,
Branch: branch,
}, bauth)
if err != nil {
t.Fatalf("expected get to return result but got %s", err)
}
if diffs := deep.Equal(newConfig, *got.GetPayload()); diffs != nil {
t.Errorf("got different configuration: %s", diffs)
}
})
}

func Test_setupLakeFSHandler(t *testing.T) {
// get handler with DB without apply the DDL
handler, deps := getHandler(t, "", testutil.WithGetDBApplyDDL(false))
Expand Down
7 changes: 7 additions & 0 deletions catalog/cataloger.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,12 @@ type Merger interface {
Merge(ctx context.Context, repository, leftBranch, rightBranch, committer, message string, metadata Metadata) (*MergeResult, error)
}

type ExportConfigurator interface {
GetExportConfigurationForBranch(repository string, branch string) (ExportConfiguration, error)
GetExportConfigurations() ([]ExportConfigurationForBranch, error)
PutExportConfiguration(repository string, branch string, conf *ExportConfiguration) error
}

type Cataloger interface {
RepositoryCataloger
BranchCataloger
Expand All @@ -160,6 +166,7 @@ type Cataloger interface {
MultipartUpdateCataloger
Differ
Merger
ExportConfigurator
io.Closer
}

Expand Down
94 changes: 94 additions & 0 deletions catalog/cataloger_export.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package catalog

import (
"fmt"
"regexp"

"github.com/lib/pq"
"github.com/treeverse/lakefs/db"
)

// ExportConfiguration describes the export configuration of a branch, as passed on wire, used
// internally, and stored in DB.
type ExportConfiguration struct {
Path string `db:"export_path"`
StatusPath string `db:"export_status_path"`
LastKeysInPrefixRegexp pq.StringArray `db:"last_keys_in_prefix_regexp"`
}

// ExportConfigurationForBranch describes how to export BranchID. It is stored in the database.
// Unfortunately golang sql doesn't know about embedded structs, so you get a useless copy of
// ExportConfiguration embedded here.
type ExportConfigurationForBranch struct {
Repository string `db:"repository"`
Branch string `db:"branch"`

Path string `db:"export_path"`
StatusPath string `db:"export_status_path"`
LastKeysInPrefixRegexp pq.StringArray `db:"last_keys_in_prefix_regexp"`
}

func (c *cataloger) GetExportConfigurationForBranch(repository string, branch string) (ExportConfiguration, error) {
ret, err := c.db.Transact(func(tx db.Tx) (interface{}, error) {
branchID, err := c.getBranchIDCache(tx, repository, branch)
var ret ExportConfiguration
if err != nil {
return nil, err
}
err = c.db.Get(&ret,
`SELECT export_path, export_status_path, last_keys_in_prefix_regexp
FROM catalog_branches_export
WHERE branch_id = $1`, branchID)
return &ret, err
})
if ret == nil {
return ExportConfiguration{}, err
}
return *ret.(*ExportConfiguration), err
}

func (c *cataloger) GetExportConfigurations() ([]ExportConfigurationForBranch, error) {
ret := make([]ExportConfigurationForBranch, 0)
rows, err := c.db.Query(
`SELECT r.name repository, b.name branch,
e.export_path export_path, e.export_status_path export_status_path,
e.last_keys_in_prefix_regexp last_keys_in_prefix_regexp
FROM catalog_branches_export e JOIN catalog_branches b ON e.branch_id = b.id
JOIN catalog_repositories r ON b.repository_id = r.id`)
if err != nil {
return nil, err
}
for rows.Next() {
var rec ExportConfigurationForBranch
if err = rows.StructScan(&rec); err != nil {
return nil, fmt.Errorf("scan configuration %+v: %w", rows, err)
}
ret = append(ret, rec)
}
return ret, nil
}

func (c *cataloger) PutExportConfiguration(repository string, branch string, conf *ExportConfiguration) error {
// Validate all fields could be compiled as regexps.
for i, r := range conf.LastKeysInPrefixRegexp {
if _, err := regexp.Compile(r); err != nil {
return fmt.Errorf("invalid regexp /%s/ at position %d in LastKeysInPrefixRegexp: %w", r, i, err)
}
}
_, err := c.db.Transact(func(tx db.Tx) (interface{}, error) {
branchID, err := c.getBranchIDCache(tx, repository, branch)
if err != nil {
return nil, err
}
_, err = c.db.Exec(
`INSERT INTO catalog_branches_export (
branch_id, export_path, export_status_path, last_keys_in_prefix_regexp)
VALUES ($1, $2, $3, $4)
ON CONFLICT (branch_id)
DO UPDATE SET (branch_id, export_path, export_status_path, last_keys_in_prefix_regexp) =
(EXCLUDED.branch_id, EXCLUDED.export_path, EXCLUDED.export_status_path, EXCLUDED.last_keys_in_prefix_regexp)`,
branchID, conf.Path, conf.StatusPath, conf.LastKeysInPrefixRegexp)
return nil, err
})
return err
}
Loading