Skip to content

Commit

Permalink
generate status file on done
Browse files Browse the repository at this point in the history
  • Loading branch information
guy-har committed Nov 9, 2020
1 parent 332f7c4 commit 7574553
Show file tree
Hide file tree
Showing 9 changed files with 67 additions and 43 deletions.
6 changes: 3 additions & 3 deletions catalog/cataloger_export.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ import (
// ExportConfiguration describes the export configuration of a branch, as passed on wire, used
// internally, and stored in DB.
type ExportConfiguration struct {
Path string `db:"export_path"`
StatusPath string `db:"export_status_path"`
LastKeysInPrefixRegexp pq.StringArray `db:"last_keys_in_prefix_regexp"`
Path string `db:"export_path" json:"path"`
StatusPath string `db:"export_status_path" json:"status_path"`
LastKeysInPrefixRegexp pq.StringArray `db:"last_keys_in_prefix_regexp" json:"regex"`
}

// ExportConfigurationForBranch describes how to export BranchID. It is stored in the database.
Expand Down
18 changes: 9 additions & 9 deletions docs/assets/js/swagger.yml
Original file line number Diff line number Diff line change
Expand Up @@ -318,11 +318,11 @@ definitions:
type: string
format: uri
x-nullable: false # Override https://github.com/go-swagger/go-swagger/issues/1188
# go-swagger totally not a bug. This causes the generated field
# *not* to be a pointer. Then the regular (incorrect, in this
# case) JSON parser parses it as an empty field, and validation
# verifies the value is non-empty. In *this particular case* it
# works because a URI cannot be empty (at least not an absolute
# go-swagger totally not a bug. This causes the generated field
# *not* to be a pointer. Then the regular (incorrect, in this
# case) JSON parser parses it as an empty field, and validation
# verifies the value is non-empty. In *this particular case* it
# works because a URI cannot be empty (at least not an absolute
# URI, which is what we require).
description: export objects to this path
example: s3://company-bucket/path/to/export
Expand Down Expand Up @@ -2043,10 +2043,10 @@ paths:

/repositories/{repository}/retention:
parameters:
- in: path
name: repository
required: true
type: string
- in: path
name: repository
required: true
type: string
put:
tags:
- retention
Expand Down
14 changes: 9 additions & 5 deletions export/export.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,18 @@ func ExportBranchStart(paradeDB parade.Parade, cataloger catalog.Cataloger, repo
commitRef := commit.Reference

err = cataloger.ExportState(repo, branch, commitRef, func(oldRef string, state catalog.CatalogBranchExportStatus) (newState catalog.CatalogBranchExportStatus, newMessage *string, err error) {
// Todo(guys) consider checking commitRef bigger than or equal oldRef
// Todo(guys) consider checking if branch configured and fail here
config, err := cataloger.GetExportConfigurationForBranch(repo, branch)
if err != nil {
msg := "failed to get export configuration for branch"
return catalog.ExportStatusFailed, &msg, err
}

exportID, err := getExportID(repo, branch, commitRef)
if err != nil {
msg := "failed generating ID"
return catalog.ExportStatusFailed, &msg, err
}
tasks, err := GetStartTasks(repo, branch, oldRef, commitRef, exportID)
tasks, err := GetStartTasks(repo, branch, oldRef, commitRef, exportID, config)
if err != nil {
msg := "failed creating start task"
return catalog.ExportStatusFailed, &msg, err
Expand All @@ -53,9 +57,9 @@ func ExportBranchStart(paradeDB parade.Parade, cataloger catalog.Cataloger, repo
}

// ExportBranchDone ends the export branch process by changing the status
func ExportBranchDone(cataloger catalog.Cataloger, status catalog.CatalogBranchExportStatus, repo, branch, commitRef string) error {
func ExportBranchDone(cataloger catalog.Cataloger, status catalog.CatalogBranchExportStatus, statusMsg *string, repo, branch, commitRef string) error {
err := cataloger.ExportState(repo, branch, commitRef, func(oldRef string, state catalog.CatalogBranchExportStatus) (newState catalog.CatalogBranchExportStatus, newMessage *string, err error) {
return status, nil, nil
return status, statusMsg, nil
})
return err
}
46 changes: 31 additions & 15 deletions export/export_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,16 +57,11 @@ func (h *Handler) start(body *string) error {
return err
}

config, err := h.cataloger.GetExportConfigurationForBranch(startData.Repo, startData.Branch)
finishBodyStr, err := getFinishBodyString(startData.Repo, startData.Branch, startData.ToCommitRef, startData.ExportConfig.StatusPath)
if err != nil {
return err
}

finishBodyStr, err := getFinishBodyString(startData.Repo, startData.Branch, startData.ToCommitRef)
if err != nil {
return err
}
return h.generateTasks(startData, config, &finishBodyStr)
return h.generateTasks(startData, startData.ExportConfig, &finishBodyStr)
}

func (h *Handler) generateTasks(startData StartData, config catalog.ExportConfiguration, finishBodyStr *string) error {
Expand Down Expand Up @@ -150,11 +145,12 @@ func getGenerateSuccess(lastKeysInPrefixRegexp []string) func(path string) bool
}
}

func getFinishBodyString(repo, branch, commitRef string) (string, error) {
func getFinishBodyString(repo, branch, commitRef, statusPath string) (string, error) {
finishData := FinishData{
Repo: repo,
Branch: branch,
CommitRef: commitRef,
Repo: repo,
Branch: branch,
CommitRef: commitRef,
StatusPath: statusPath,
}
finisBody, err := json.Marshal(finishData)
if err != nil {
Expand Down Expand Up @@ -206,18 +202,38 @@ func (h *Handler) touch(body *string) error {
return h.adapter.Put(path, 0, strings.NewReader(""), block.PutOpts{})
}

func (h *Handler) done(body *string) error {
func getStatus(signalledErrors int) (catalog.CatalogBranchExportStatus, *string) {
if signalledErrors > 0 {
msg := fmt.Sprintf("%d tasks failed\n", signalledErrors)
return catalog.ExportStatusFailed, &msg
}
return catalog.ExportStatusSuccess, nil
}
func (h *Handler) done(body *string, signalledErrors int) error {
var finishData FinishData
err := json.Unmarshal([]byte(*body), &finishData)
if err != nil {
return err
}
return ExportBranchDone(h.cataloger, catalog.ExportStatusSuccess, finishData.Repo, finishData.Branch, finishData.CommitRef)

status, msg := getStatus(signalledErrors)
fileName := fmt.Sprintf("%s-%s-%s", finishData.Repo, finishData.Branch, finishData.CommitRef)
path, err := PathToPointer(fmt.Sprintf("%s/%s", finishData.StatusPath, fileName))
if err != nil {
return err
}
data := fmt.Sprintf("status: %s, signalled_errors: %d\n", status, signalledErrors)
reader := strings.NewReader(data)
err = h.adapter.Put(path, reader.Size(), reader, block.PutOpts{})
if err != nil {
return err
}
return ExportBranchDone(h.cataloger, status, msg, finishData.Branch, finishData.CommitRef, finishData.Repo)
}

var errUnknownAction = errors.New("unknown action")

func (h *Handler) Handle(action string, body *string) parade.ActorResult {
func (h *Handler) Handle(action string, body *string, signalledErrors int) parade.ActorResult {
var err error
switch action {
case StartAction:
Expand All @@ -229,7 +245,7 @@ func (h *Handler) Handle(action string, body *string) parade.ActorResult {
case TouchAction:
err = h.touch(body)
case DoneAction:
err = h.done(body)
err = h.done(body, signalledErrors)
default:
err = errUnknownAction
}
Expand Down
6 changes: 3 additions & 3 deletions export/export_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func TestCopy(t *testing.T) {
Action: CopyAction,
Body: &taskBodyStr,
}
if res := h.Handle(task.Action, task.Body); res.StatusCode != parade.TaskCompleted {
if res := h.Handle(task.Action, task.Body, task.NumSignalledFailures); res.StatusCode != parade.TaskCompleted {
t.Errorf("expected status code: %s, got: %s", parade.TaskCompleted, res.StatusCode)
}
// read Destination
Expand Down Expand Up @@ -94,7 +94,7 @@ func TestDelete(t *testing.T) {
Action: DeleteAction,
Body: &taskBodyStr,
}
if res := h.Handle(task.Action, task.Body); res.StatusCode != parade.TaskCompleted {
if res := h.Handle(task.Action, task.Body, task.NumSignalledFailures); res.StatusCode != parade.TaskCompleted {
t.Errorf("expected status code: %s, got: %s", parade.TaskCompleted, res.StatusCode)
}
// read Destination
Expand Down Expand Up @@ -132,7 +132,7 @@ func TestTouch(t *testing.T) {
Action: TouchAction,
Body: &taskBodyStr,
}
if res := h.Handle(task.Action, task.Body); res.StatusCode != parade.TaskCompleted {
if res := h.Handle(task.Action, task.Body, task.NumSignalledFailures); res.StatusCode != parade.TaskCompleted {
t.Errorf("expected status code: %s, got: %s", parade.TaskCompleted, res.StatusCode)
}
// read Destination
Expand Down
11 changes: 7 additions & 4 deletions export/tasks_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ type StartData struct {
FromCommitRef string `json:"from"`
ToCommitRef string `json:"to"`
ExportID string `json:"export_id"`
ExportConfig catalog.ExportConfiguration
}

type CopyData struct {
Expand All @@ -52,9 +53,10 @@ type SuccessData struct {
}

type FinishData struct {
Repo string `json:"repo"`
Branch string `json:"branch"`
CommitRef string `json:"commitRef"`
Repo string `json:"repo"`
Branch string `json:"branch"`
CommitRef string `json:"commitRef"`
StatusPath string `json:"status_path"`
}

// Returns the "dirname" of path: everything up to the last "/" (excluding that slash). If
Expand Down Expand Up @@ -260,14 +262,15 @@ type TasksGenerator struct {
successTasksGenerator SuccessTasksTreeGenerator
}

func GetStartTasks(repo, branch, fromCommitRef, toCommitRef, exportID string) ([]parade.TaskData, error) {
func GetStartTasks(repo, branch, fromCommitRef, toCommitRef, exportID string, config catalog.ExportConfiguration) ([]parade.TaskData, error) {
one, zero := 1, 0
data := StartData{
Repo: repo,
Branch: branch,
FromCommitRef: fromCommitRef,
ToCommitRef: toCommitRef,
ExportID: exportID,
ExportConfig: config,
}
body, err := json.Marshal(data)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion parade/action_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ func (a *workerPool) start() {
workerID := uuid.New()
defer a.wg.Done()
for task := range a.ch {
res := a.actor.Handle(task.Action, task.Body)
res := a.actor.Handle(task.Action, task.Body, task.NumSignalledFailures)
err := a.parade.ReturnTask(task.ID, task.Token, res.Status, res.StatusCode)
if err != nil {
logging.Default().WithFields(logging.Fields{
Expand Down
5 changes: 3 additions & 2 deletions parade/action_manager_test.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
package parade_test

import (
"github.com/treeverse/lakefs/parade"
"strconv"
"sync/atomic"
"testing"
"time"

"github.com/treeverse/lakefs/parade"
)

type ownTaskResult struct {
Expand Down Expand Up @@ -51,7 +52,7 @@ type mockHandler struct {
handleCalled int32
}

func (m *mockHandler) Handle(_ string, _ *string) parade.ActorResult {
func (m *mockHandler) Handle(_ string, _ *string, _ int) parade.ActorResult {
atomic.AddInt32(&m.handleCalled, 1)
return parade.ActorResult{}
}
Expand Down
2 changes: 1 addition & 1 deletion parade/actor.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ type ActorResult struct {
// Actor handles an action or a group of actions
type Actor interface {
// Handle performs actions with the given body and return the ActorResult
Handle(action string, body *string) ActorResult
Handle(action string, body *string, signalledErrors int) ActorResult
// Actions returns the list of actions that could be performed by the Actor
Actions() []string
// ActorID returns the ID of the actor
Expand Down

0 comments on commit 7574553

Please sign in to comment.