Skip to content

Commit

Permalink
Configure branch continuous export: Swagger defs and API handlers
Browse files Browse the repository at this point in the history
  • Loading branch information
arielshaqed committed Oct 19, 2020
1 parent a34dcf4 commit 4032a01
Show file tree
Hide file tree
Showing 4 changed files with 353 additions and 48 deletions.
94 changes: 94 additions & 0 deletions api/api_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,22 @@ import (
"fmt"
"net/http"
"path/filepath"
"regexp"
"strings"
"time"

"github.com/aws/aws-sdk-go/aws"
"github.com/go-openapi/runtime"
"github.com/go-openapi/runtime/middleware"
"github.com/go-openapi/strfmt"
"github.com/go-openapi/swag"
"github.com/treeverse/lakefs/api/gen/models"
"github.com/treeverse/lakefs/api/gen/restapi/operations"
authop "github.com/treeverse/lakefs/api/gen/restapi/operations/auth"
"github.com/treeverse/lakefs/api/gen/restapi/operations/branches"
"github.com/treeverse/lakefs/api/gen/restapi/operations/commits"
configop "github.com/treeverse/lakefs/api/gen/restapi/operations/config"
exportop "github.com/treeverse/lakefs/api/gen/restapi/operations/export"
hcop "github.com/treeverse/lakefs/api/gen/restapi/operations/health_check"
metadataop "github.com/treeverse/lakefs/api/gen/restapi/operations/metadata"
"github.com/treeverse/lakefs/api/gen/restapi/operations/objects"
Expand Down Expand Up @@ -184,8 +187,12 @@ func (c *Controller) Configure(api *operations.LakefsAPI) {

api.RetentionGetRetentionPolicyHandler = c.RetentionGetRetentionPolicyHandler()
api.RetentionUpdateRetentionPolicyHandler = c.RetentionUpdateRetentionPolicyHandler()

api.MetadataCreateSymlinkHandler = c.MetadataCreateSymlinkHandler()

api.ExportGetContinuousExportHandler = c.ExportGetContinuousExportHandler()
api.ExportSetContinuousExportHandler = c.ExportSetContinuousExportHandler()

api.ConfigGetConfigHandler = c.ConfigGetConfigHandler()
}

Expand Down Expand Up @@ -2179,6 +2186,93 @@ func (c *Controller) DetachPolicyFromGroupHandler() authop.DetachPolicyFromGroup
})
}

func (c *Controller) ExportGetContinuousExportHandler() exportop.GetContinuousExportHandler {
return exportop.GetContinuousExportHandlerFunc(func(params exportop.GetContinuousExportParams, user *models.User) middleware.Responder {
deps, err := c.setupRequest(user, params.HTTPRequest, []permissions.Permission{
{
Action: permissions.ListBranchesAction,
Resource: permissions.BranchArn(params.Repository, params.Branch),
},
})
if err != nil {
return exportop.NewGetContinuousExportUnauthorized().
WithPayload(responseErrorFrom(err))
}

deps.LogAction("get_continuous_export")

config, err := deps.Cataloger.GetExportConfigurationForBranch(params.Repository, params.Branch)
if errors.Is(err, catalog.ErrRepositoryNotFound) || errors.Is(err, catalog.ErrBranchNotFound) {
return exportop.NewGetContinuousExportNotFound().
WithPayload(responseErrorFrom(err))
}
if err != nil {
return exportop.NewGetContinuousExportDefault(http.StatusInternalServerError).
WithPayload(responseErrorFrom(err))
}

prefixRegexp, err := regexp.Compile(config.LastKeysInPrefixRegexp)
if err != nil {
return exportop.NewGetContinuousExportDefault(http.StatusInternalServerError).
WithPayload(responseErrorFrom(err))
}

prefixRegexps, err := catalog.DeconstructDisjunction(prefixRegexp)
if err != nil {
return exportop.NewGetContinuousExportDefault(http.StatusInternalServerError).
WithPayload(responseErrorFrom(fmt.Errorf(
"cannot deconstruct last_keys_in_prefix_regexp %s: %w", config.LastKeysInPrefixRegexp, err)))
}

payload := models.ContinuousExportConfiguration{
ExportPath: strfmt.URI(config.Path),
ExportStatusPath: strfmt.URI(config.StatusPath),
LastKeysInPrefixRegexp: prefixRegexps,
}
return exportop.NewGetContinuousExportOK().WithPayload(&payload)

})
}

func (c *Controller) ExportSetContinuousExportHandler() exportop.SetContinuousExportHandlerFunc {
return exportop.SetContinuousExportHandlerFunc(func(params exportop.SetContinuousExportParams, user *models.User) middleware.Responder {
deps, err := c.setupRequest(user, params.HTTPRequest, []permissions.Permission{
{
Action: permissions.CreateBranchAction,
Resource: permissions.BranchArn(params.Repository, params.Branch),
},
})
if err != nil {
return exportop.NewSetContinuousExportUnauthorized().
WithPayload(responseErrorFrom(err))
}

deps.LogAction("set_continuous_export")

lastKeysInPrefixRegexp, err := catalog.DisjunctRegexps(params.Config.LastKeysInPrefixRegexp)
if err != nil {
return exportop.NewSetContinuousExportDefault(http.StatusInternalServerError).
WithPayload(responseError("join last keys in prefix regexps: %s", err))
}
config := catalog.ExportConfiguration{
Path: params.Config.ExportPath.String(),
StatusPath: params.Config.ExportStatusPath.String(),
LastKeysInPrefixRegexp: lastKeysInPrefixRegexp.String(),
}
err = deps.Cataloger.PutExportConfiguration(params.Repository, params.Branch, &config)
if errors.Is(err, catalog.ErrRepositoryNotFound) || errors.Is(err, catalog.ErrBranchNotFound) {
return exportop.NewSetContinuousExportNotFound().
WithPayload(responseErrorFrom(err))
}
if err != nil {
return exportop.NewSetContinuousExportDefault(http.StatusInternalServerError).
WithPayload(responseErrorFrom(err))
}

return exportop.NewSetContinuousExportCreated()
})
}

func (c *Controller) RetentionGetRetentionPolicyHandler() retentionop.GetRetentionPolicyHandler {
return retentionop.GetRetentionPolicyHandlerFunc(func(params retentionop.GetRetentionPolicyParams, user *models.User) middleware.Responder {
deps, err := c.setupRequest(user, params.HTTPRequest, []permissions.Permission{
Expand Down
89 changes: 89 additions & 0 deletions api/api_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,15 @@ import (

"github.com/go-openapi/runtime"
httptransport "github.com/go-openapi/runtime/client"
"github.com/go-openapi/strfmt"
"github.com/go-openapi/swag"
"github.com/go-test/deep"
"github.com/treeverse/lakefs/api/gen/client"
"github.com/treeverse/lakefs/api/gen/client/auth"
"github.com/treeverse/lakefs/api/gen/client/branches"
"github.com/treeverse/lakefs/api/gen/client/commits"
"github.com/treeverse/lakefs/api/gen/client/config"
"github.com/treeverse/lakefs/api/gen/client/export"
"github.com/treeverse/lakefs/api/gen/client/objects"
"github.com/treeverse/lakefs/api/gen/client/repositories"
"github.com/treeverse/lakefs/api/gen/client/retention"
Expand Down Expand Up @@ -1354,6 +1356,93 @@ func TestHandler_ConfigHandlers(t *testing.T) {
})
}

func TestHandler_ContinuousExportHandlers(t *testing.T) {
const (
repo = "repo-for-continuous-export-test"
branch = "main"
anotherBranch = "notMain"
)
handler, deps := getHandler(t, "")

creds := createDefaultAdminUser(deps.auth, t)
bauth := httptransport.BasicAuth(creds.AccessKeyID, creds.AccessSecretKey)

clt := client.Default
clt.SetTransport(&handlerTransport{Handler: handler})

ctx := context.Background()
_, err := deps.cataloger.CreateRepository(ctx, repo, "s3://foo1", branch)
testutil.MustDo(t, "create repository", err)

config := models.ContinuousExportConfiguration{
ExportPath: strfmt.URI("s3://bucket/export"),
ExportStatusPath: strfmt.URI("s3://bucket/report"),
LastKeysInPrefixRegexp: []string{"^_success$", ".*/_success$"},
}

res, err := clt.Export.SetContinuousExport(&export.SetContinuousExportParams{
Repository: repo,
Branch: branch,
Config: &config,
}, bauth)
testutil.MustDo(t, "initial continuous export configuration", err)
if res == nil {
t.Fatalf("initial continuous export configuration: expected OK but got nil")
}

t.Run("get missing branch configuration", func(t *testing.T) {
res, err := clt.Export.GetContinuousExport(&export.GetContinuousExportParams{
Repository: repo,
Branch: anotherBranch,
}, bauth)
if err == nil || res != nil {
t.Fatalf("expected get to return an error but got result %v, error nil", res)
}
if _, ok := err.(*export.GetContinuousExportNotFound); !ok {
t.Errorf("expected get to return not found but got %T %+v", err, err)
}
})

t.Run("get configured branch", func(t *testing.T) {
got, err := clt.Export.GetContinuousExport(&export.GetContinuousExportParams{
Repository: repo,
Branch: branch,
}, bauth)
if err != nil {
t.Fatalf("expected get to return result but got %s", err)
}
if diffs := deep.Equal(config, *got.GetPayload()); diffs != nil {
t.Errorf("got different configuration: %s", diffs)
}
})

t.Run("overwrite configuration", func(t *testing.T) {
newConfig := models.ContinuousExportConfiguration{
ExportPath: strfmt.URI("s3://better-bucket/export"),
ExportStatusPath: strfmt.URI("s3://better-bucket/report"),
LastKeysInPrefixRegexp: nil,
}
_, err := clt.Export.SetContinuousExport(&export.SetContinuousExportParams{
Repository: repo,
Branch: branch,
Config: &newConfig,
}, bauth)
if err != nil {
t.Errorf("failed to overwrite continuous export configuration: %s", err)
}
got, err := clt.Export.GetContinuousExport(&export.GetContinuousExportParams{
Repository: repo,
Branch: branch,
}, bauth)
if err != nil {
t.Fatalf("expected get to return result but got %s", err)
}
if diffs := deep.Equal(newConfig, *got.GetPayload()); diffs != nil {
t.Errorf("got different configuration: %s", diffs)
}
})
}

func Test_setupLakeFSHandler(t *testing.T) {
// get handler with DB without apply the DDL
handler, deps := getHandler(t, "", testutil.WithGetDBApplyDDL(false))
Expand Down
133 changes: 85 additions & 48 deletions docs/assets/js/swagger.yml
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,35 @@ definitions:
- id
- statement

continuous_export_configuration:
type: object
required:
- exportPath
properties:
exportPath:
type: string
format: uri
x-nullable: false # Override https://github.com/go-swagger/go-swagger/issues/1188
# go-swagger totally not a bug. This causes the generated field
# *not* to be a pointer. Then the regular (incorrect, in this
# case) JSON parser parses it as an empty field, and validation
# verifies the value is non-empty. In *this particular case* it
# works because a URI cannot be empty (at least not an absolute
# URI, which is what we require).
description: export objects to this path
example: s3://company-bucket/path/to/export
exportStatusPath:
type: string
format: uri
description: write export status object to this path
example: s3://company-bucket/path/to/status
lastKeysInPrefixRegexp:
type: array
items:
type: string
description: "list of regexps of keys to exported last in each prefix (for signalling)"
example: ["^SUCCESS$", ".*/_SUCCESS$"]

retention_policy:
type: object
required:
Expand Down Expand Up @@ -1258,54 +1287,6 @@ paths:
schema:
$ref: "#/definitions/error"

/repositories/{repository}/inventory/s3/import:
parameters:
- in: path
name: repository
required: true
type: string
- in: query
name: manifestUrl
required: true
type: string
- in: query
name: dryRun
type: boolean
default: false
post:
tags:
- repositories
operationId: importFromS3Inventory
summary: import metadata for an existing bucket in S3
responses:
201:
description: import results
schema:
type: object
properties:
is_dry_run:
type: boolean
added_or_changed:
type: integer
format: int64
deleted:
type: integer
format: int64
previous_manifest:
type: string
previous_import_date:
type: integer
format: int64
401:
$ref: "#/responses/Unauthorized"
404:
description: "repository not found"
schema:
$ref: "#/definitions/error"
default:
description: generic error response
schema:
$ref: "#/definitions/error"
/repositories/{repository}/branches:
parameters:
- in: path
Expand Down Expand Up @@ -1972,6 +1953,62 @@ paths:
schema:
$ref: "#/definitions/error"

/repositories/{repository}/branches/{branch}/continuous-export:
parameters:
- in: path
name: repository
required: true
type: string
- in: path
name: branch
required: true
type: string
get:
tags:
- export
- branches
operationId: getContinuousExport
summary: returns the current continuous export configuration of a branch
responses:
200:
description: continuous export policy
schema:
$ref: "#/definitions/continuous_export_configuration"
401:
$ref: "#/responses/Unauthorized"
404:
description: no continuous export policy defined
schema:
$ref: "#/definitions/error"
default:
description: generic error response
schema:
$ref: "#/definitions/error"
put:
tags:
- export
- branches
operationId: setContinuousExport
summary: sets a new continuous export configuration of a branch
parameters:
- in: body
name: config
required: true
schema:
$ref: "#/definitions/continuous_export_configuration"
responses:
201:
description: continuous export successfullyconfigured
401:
$ref: "#/responses/Unauthorized"
404:
description: no branch defined at that repo
schema:
$ref: "#/definitions/error"
default:
description: generic error response
schema:
$ref: "#/definitions/error"

/repositories/{repository}/retention:
parameters:
Expand Down
Loading

0 comments on commit 4032a01

Please sign in to comment.