Skip to content

Commit

Permalink
Add continuous export configuration
Browse files Browse the repository at this point in the history
(Doesn't do anything yet)
  • Loading branch information
arielshaqed committed Nov 17, 2020
1 parent 34185da commit 1ee9a61
Show file tree
Hide file tree
Showing 8 changed files with 66 additions and 13 deletions.
2 changes: 2 additions & 0 deletions api/api_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -2211,6 +2211,7 @@ func (c *Controller) ExportGetContinuousExportHandler() exportop.GetContinuousEx
ExportPath: strfmt.URI(config.Path),
ExportStatusPath: strfmt.URI(config.StatusPath),
LastKeysInPrefixRegexp: config.LastKeysInPrefixRegexp,
IsContinuous: config.IsContinuous,
}
return exportop.NewGetContinuousExportOK().WithPayload(&payload)
})
Expand Down Expand Up @@ -2258,6 +2259,7 @@ func (c *Controller) ExportSetContinuousExportHandler() exportop.SetContinuousEx
Path: params.Config.ExportPath.String(),
StatusPath: params.Config.ExportStatusPath.String(),
LastKeysInPrefixRegexp: params.Config.LastKeysInPrefixRegexp,
IsContinuous: params.Config.IsContinuous,
}
err = deps.Cataloger.PutExportConfiguration(params.Repository, params.Branch, &config)
if errors.Is(err, catalog.ErrRepositoryNotFound) || errors.Is(err, catalog.ErrBranchNotFound) {
Expand Down
17 changes: 10 additions & 7 deletions catalog/cataloger_export.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ type ExportConfiguration struct {
Path string `db:"export_path" json:"export_path"`
StatusPath string `db:"export_status_path" json:"export_status_path"`
LastKeysInPrefixRegexp pq.StringArray `db:"last_keys_in_prefix_regexp" json:"last_keys_in_prefix_regexp"`
IsContinuous bool `db:"continuous" json:"is_continuous"`
}

// ExportConfigurationForBranch describes how to export BranchID. It is stored in the database.
Expand All @@ -30,6 +31,7 @@ type ExportConfigurationForBranch struct {
Path string `db:"export_path"`
StatusPath string `db:"export_status_path"`
LastKeysInPrefixRegexp pq.StringArray `db:"last_keys_in_prefix_regexp"`
IsContinuous bool `db:"continuous"`
}

type CatalogBranchExportStatus string
Expand Down Expand Up @@ -83,7 +85,7 @@ func (c *cataloger) GetExportConfigurationForBranch(repository string, branch st
return nil, err
}
err = c.db.Get(&ret,
`SELECT export_path, export_status_path, last_keys_in_prefix_regexp
`SELECT export_path, export_status_path, last_keys_in_prefix_regexp, continuous
FROM catalog_branches_export
WHERE branch_id = $1`, branchID)
return &ret, err
Expand All @@ -99,7 +101,8 @@ func (c *cataloger) GetExportConfigurations() ([]ExportConfigurationForBranch, e
rows, err := c.db.Query(
`SELECT r.name repository, b.name branch,
e.export_path export_path, e.export_status_path export_status_path,
e.last_keys_in_prefix_regexp last_keys_in_prefix_regexp
e.last_keys_in_prefix_regexp last_keys_in_prefix_regexp,
e.continuous continuous
FROM catalog_branches_export e JOIN catalog_branches b ON e.branch_id = b.id
JOIN catalog_repositories r ON b.repository_id = r.id`)
if err != nil {
Expand All @@ -123,12 +126,12 @@ func (c *cataloger) PutExportConfiguration(repository string, branch string, con
}
_, err = c.db.Exec(
`INSERT INTO catalog_branches_export (
branch_id, export_path, export_status_path, last_keys_in_prefix_regexp)
VALUES ($1, $2, $3, $4)
branch_id, export_path, export_status_path, last_keys_in_prefix_regexp, continuous)
VALUES ($1, $2, $3, $4, $5)
ON CONFLICT (branch_id)
DO UPDATE SET (branch_id, export_path, export_status_path, last_keys_in_prefix_regexp) =
(EXCLUDED.branch_id, EXCLUDED.export_path, EXCLUDED.export_status_path, EXCLUDED.last_keys_in_prefix_regexp)`,
branchID, conf.Path, conf.StatusPath, conf.LastKeysInPrefixRegexp)
DO UPDATE SET (branch_id, export_path, export_status_path, last_keys_in_prefix_regexp, continuous) =
(EXCLUDED.branch_id, EXCLUDED.export_path, EXCLUDED.export_status_path, EXCLUDED.last_keys_in_prefix_regexp, EXCLUDED.continuous)`,
branchID, conf.Path, conf.StatusPath, conf.LastKeysInPrefixRegexp, conf.IsContinuous)
return nil, err
})
return err
Expand Down
19 changes: 19 additions & 0 deletions catalog/cataloger_export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,25 @@ func TestExportConfiguration(t *testing.T) {
}
})

t.Run("continuous", func(t *testing.T) {
newCfg := ExportConfiguration{
Path: "/better/to/export",
StatusPath: "/better/for/status",
LastKeysInPrefixRegexp: pq.StringArray{"abc", "def", "xyz"},
IsContinuous: true,
}
if err := c.PutExportConfiguration(repo, defaultBranch, &newCfg); err != nil {
t.Fatalf("update configuration with %+v: %s", newCfg, err)
}
gotCfg, err := c.GetExportConfigurationForBranch(repo, defaultBranch)
if err != nil {
t.Errorf("get updated configuration for configured branch failed: %s", err)
}
if diffs := deep.Equal(newCfg, gotCfg); diffs != nil {
t.Errorf("got other configuration than expected: %s", diffs)
}
})

t.Run("invalid regexp", func(t *testing.T) {
badCfg := ExportConfiguration{
Path: "/better/to/export",
Expand Down
22 changes: 18 additions & 4 deletions cmd/lakectl/cmd/export.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ var exportCmd = &cobra.Command{
var exportSetCmd = &cobra.Command{
Use: "set <branch uri>",
Short: "set continuous export configuration for branch",
Long: `Set the entire continuous export configuration for branch.
Overrides all fields of any previous configuration.`,
Run: func(cmd *cobra.Command, args []string) {
client := getClient()
branchURI := uri.Must(uri.Parse(args[0]))
Expand All @@ -36,10 +38,15 @@ var exportSetCmd = &cobra.Command{
if err != nil {
DieErr(err)
}
isContinuous, err := cmd.Flags().GetBool("continuous")
if err != nil {
DieErr(err)
}
config := &models.ContinuousExportConfiguration{
ExportPath: strfmt.URI(exportPath),
ExportStatusPath: strfmt.URI(exportStatusPath),
LastKeysInPrefixRegexp: prefixRegex,
IsContinuous: isContinuous,
}
err = client.SetContinuousExport(context.Background(), branchURI.Repository, branchURI.Ref, config)
if err != nil {
Expand All @@ -53,7 +60,7 @@ var exportConfigurationTemplate = `export configuration for branch "{{.Branch.Re
Export Path: {{.Configuration.ExportPath|yellow}}
Export status path: {{.Configuration.ExportStatusPath}}
Last Keys In Prefix Regexp: {{.Configuration.LastKeysInPrefixRegexp}}
{{.ContinuousMarker}}
`

// exportGetCmd get continuous export configuration for branch
Expand All @@ -68,10 +75,15 @@ var exportGetCmd = &cobra.Command{
if err != nil {
DieErr(err)
}
continuousMarker := ""
if configuration.IsContinuous {
continuousMarker = "Continuously exported\n"
}
Write(exportConfigurationTemplate, struct {
Branch *uri.URI
Configuration *models.ContinuousExportConfiguration
}{branchURI, configuration})
Branch *uri.URI
Configuration *models.ContinuousExportConfiguration
ContinuousMarker string
}{branchURI, configuration, continuousMarker})
},
}

Expand Down Expand Up @@ -100,5 +112,7 @@ func init() {
exportSetCmd.Flags().String("path", "", "export objects to this path")
exportSetCmd.Flags().String("status-path", "", "write export status object to this path")
exportSetCmd.Flags().StringArray("prefix-regex", nil, "list of regexps of keys to exported last in each prefix (for signalling)")
exportSetCmd.Flags().Bool("continuous", false, "export branch after every commit or merge (...=false to disable)")
_ = exportSetCmd.MarkFlagRequired("path")
_ = exportSetCmd.MarkFlagRequired("continuous")
}
7 changes: 7 additions & 0 deletions ddl/000012_export_continuous.up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
BEGIN;

ALTER TABLE catalog_branches_export ADD COLUMN continuous BOOLEAN;
UPDATE catalog_branches_export SET continuous=false;
ALTER TABLE catalog_branches_export ALTER COLUMN continuous SET NOT NULL;

END;
1 change: 1 addition & 0 deletions ddl/0012_export_continuous.down.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
ALTER TABLE catalog_branches_export DROP COLUMN continuous;
8 changes: 6 additions & 2 deletions docs/assets/js/swagger.yml
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,9 @@ definitions:
type: string
description: "list of regexps of keys to exported last in each prefix (for signalling)"
example: [ "^SUCCESS$", ".*/_SUCCESS$" ]
isContinuous:
type: boolean
description: if true, export every commit or merge to branch

retention_policy:
type: object
Expand Down Expand Up @@ -2026,10 +2029,10 @@ paths:
- export
- branches
operationId: run
summary: export branch
summary: hook to be called in order to execute continuous export on branch
responses:
201:
description: export successfully started
description: continuous export successfully started
schema:
description: "export ID"
type: string
Expand Down Expand Up @@ -2111,3 +2114,4 @@ paths:
$ref: "#/definitions/config"
401:
$ref: "#/responses/Unauthorized"

3 changes: 3 additions & 0 deletions swagger.yml
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,9 @@ definitions:
type: string
description: "list of regexps of keys to exported last in each prefix (for signalling)"
example: [ "^SUCCESS$", ".*/_SUCCESS$" ]
isContinuous:
type: boolean
description: if true, export every commit or merge to branch

retention_policy:
type: object
Expand Down

0 comments on commit 1ee9a61

Please sign in to comment.