diff --git a/catalog/cataloger_export.go b/catalog/cataloger_export.go index 0b15a1481c1..51e2432d203 100644 --- a/catalog/cataloger_export.go +++ b/catalog/cataloger_export.go @@ -15,9 +15,9 @@ import ( // ExportConfiguration describes the export configuration of a branch, as passed on wire, used // internally, and stored in DB. type ExportConfiguration struct { - Path string `db:"export_path"` - StatusPath string `db:"export_status_path"` - LastKeysInPrefixRegexp pq.StringArray `db:"last_keys_in_prefix_regexp"` + Path string `db:"export_path" json:"path"` + StatusPath string `db:"export_status_path" json:"status_path"` + LastKeysInPrefixRegexp pq.StringArray `db:"last_keys_in_prefix_regexp" json:"regex"` } // ExportConfigurationForBranch describes how to export BranchID. It is stored in the database. diff --git a/docs/assets/js/swagger.yml b/docs/assets/js/swagger.yml index aeca552ee9f..a5919eb2847 100644 --- a/docs/assets/js/swagger.yml +++ b/docs/assets/js/swagger.yml @@ -318,11 +318,11 @@ definitions: type: string format: uri x-nullable: false # Override https://github.com/go-swagger/go-swagger/issues/1188 - # go-swagger totally not a bug. This causes the generated field - # *not* to be a pointer. Then the regular (incorrect, in this - # case) JSON parser parses it as an empty field, and validation - # verifies the value is non-empty. In *this particular case* it - # works because a URI cannot be empty (at least not an absolute + # go-swagger totally not a bug. This causes the generated field + # *not* to be a pointer. Then the regular (incorrect, in this + # case) JSON parser parses it as an empty field, and validation + # verifies the value is non-empty. In *this particular case* it + # works because a URI cannot be empty (at least not an absolute # URI, which is what we require). description: export objects to this path example: s3://company-bucket/path/to/export @@ -2043,10 +2043,10 @@ paths: /repositories/{repository}/retention: parameters: - - in: path - name: repository - required: true - type: string + - in: path + name: repository + required: true + type: string put: tags: - retention diff --git a/export/export.go b/export/export.go index 6536fb27849..ab33df869f2 100644 --- a/export/export.go +++ b/export/export.go @@ -29,14 +29,18 @@ func ExportBranchStart(paradeDB parade.Parade, cataloger catalog.Cataloger, repo commitRef := commit.Reference err = cataloger.ExportState(repo, branch, commitRef, func(oldRef string, state catalog.CatalogBranchExportStatus) (newState catalog.CatalogBranchExportStatus, newMessage *string, err error) { - // Todo(guys) consider checking commitRef bigger than or equal oldRef - // Todo(guys) consider checking if branch configured and fail here + config, err := cataloger.GetExportConfigurationForBranch(repo, branch) + if err != nil { + msg := "failed to get export configuration for branch" + return catalog.ExportStatusFailed, &msg, err + } + exportID, err := getExportID(repo, branch, commitRef) if err != nil { msg := "failed generating ID" return catalog.ExportStatusFailed, &msg, err } - tasks, err := GetStartTasks(repo, branch, oldRef, commitRef, exportID) + tasks, err := GetStartTasks(repo, branch, oldRef, commitRef, exportID, config) if err != nil { msg := "failed creating start task" return catalog.ExportStatusFailed, &msg, err @@ -53,9 +57,9 @@ func ExportBranchStart(paradeDB parade.Parade, cataloger catalog.Cataloger, repo } // ExportBranchDone ends the export branch process by changing the status -func ExportBranchDone(cataloger catalog.Cataloger, status catalog.CatalogBranchExportStatus, repo, branch, commitRef string) error { +func ExportBranchDone(cataloger catalog.Cataloger, status catalog.CatalogBranchExportStatus, statusMsg *string, repo, branch, commitRef string) error { err := cataloger.ExportState(repo, branch, commitRef, func(oldRef string, state catalog.CatalogBranchExportStatus) (newState catalog.CatalogBranchExportStatus, newMessage *string, err error) { - return status, nil, nil + return status, statusMsg, nil }) return err } diff --git a/export/export_handler.go b/export/export_handler.go index 4d73e76fb1f..26acf62e193 100644 --- a/export/export_handler.go +++ b/export/export_handler.go @@ -57,16 +57,11 @@ func (h *Handler) start(body *string) error { return err } - config, err := h.cataloger.GetExportConfigurationForBranch(startData.Repo, startData.Branch) + finishBodyStr, err := getFinishBodyString(startData.Repo, startData.Branch, startData.ToCommitRef, startData.ExportConfig.StatusPath) if err != nil { return err } - - finishBodyStr, err := getFinishBodyString(startData.Repo, startData.Branch, startData.ToCommitRef) - if err != nil { - return err - } - return h.generateTasks(startData, config, &finishBodyStr) + return h.generateTasks(startData, startData.ExportConfig, &finishBodyStr) } func (h *Handler) generateTasks(startData StartData, config catalog.ExportConfiguration, finishBodyStr *string) error { @@ -150,11 +145,12 @@ func getGenerateSuccess(lastKeysInPrefixRegexp []string) func(path string) bool } } -func getFinishBodyString(repo, branch, commitRef string) (string, error) { +func getFinishBodyString(repo, branch, commitRef, statusPath string) (string, error) { finishData := FinishData{ - Repo: repo, - Branch: branch, - CommitRef: commitRef, + Repo: repo, + Branch: branch, + CommitRef: commitRef, + StatusPath: statusPath, } finisBody, err := json.Marshal(finishData) if err != nil { @@ -206,18 +202,38 @@ func (h *Handler) touch(body *string) error { return h.adapter.Put(path, 0, strings.NewReader(""), block.PutOpts{}) } -func (h *Handler) done(body *string) error { +func getStatus(signalledErrors int) (catalog.CatalogBranchExportStatus, *string) { + if signalledErrors > 0 { + msg := fmt.Sprintf("%d tasks failed\n", signalledErrors) + return catalog.ExportStatusFailed, &msg + } + return catalog.ExportStatusSuccess, nil +} +func (h *Handler) done(body *string, signalledErrors int) error { var finishData FinishData err := json.Unmarshal([]byte(*body), &finishData) if err != nil { return err } - return ExportBranchDone(h.cataloger, catalog.ExportStatusSuccess, finishData.Repo, finishData.Branch, finishData.CommitRef) + + status, msg := getStatus(signalledErrors) + fileName := fmt.Sprintf("%s-%s-%s", finishData.Repo, finishData.Branch, finishData.CommitRef) + path, err := PathToPointer(fmt.Sprintf("%s/%s", finishData.StatusPath, fileName)) + if err != nil { + return err + } + data := fmt.Sprintf("status: %s, signalled_errors: %d\n", status, signalledErrors) + reader := strings.NewReader(data) + err = h.adapter.Put(path, reader.Size(), reader, block.PutOpts{}) + if err != nil { + return err + } + return ExportBranchDone(h.cataloger, status, msg, finishData.Branch, finishData.CommitRef, finishData.Repo) } var errUnknownAction = errors.New("unknown action") -func (h *Handler) Handle(action string, body *string) parade.ActorResult { +func (h *Handler) Handle(action string, body *string, signalledErrors int) parade.ActorResult { var err error switch action { case StartAction: @@ -229,7 +245,7 @@ func (h *Handler) Handle(action string, body *string) parade.ActorResult { case TouchAction: err = h.touch(body) case DoneAction: - err = h.done(body) + err = h.done(body, signalledErrors) default: err = errUnknownAction } diff --git a/export/export_handler_test.go b/export/export_handler_test.go index b5072d17d1f..fe78d74fd2a 100644 --- a/export/export_handler_test.go +++ b/export/export_handler_test.go @@ -46,7 +46,7 @@ func TestCopy(t *testing.T) { Action: CopyAction, Body: &taskBodyStr, } - if res := h.Handle(task.Action, task.Body); res.StatusCode != parade.TaskCompleted { + if res := h.Handle(task.Action, task.Body, task.NumSignalledFailures); res.StatusCode != parade.TaskCompleted { t.Errorf("expected status code: %s, got: %s", parade.TaskCompleted, res.StatusCode) } // read Destination @@ -94,7 +94,7 @@ func TestDelete(t *testing.T) { Action: DeleteAction, Body: &taskBodyStr, } - if res := h.Handle(task.Action, task.Body); res.StatusCode != parade.TaskCompleted { + if res := h.Handle(task.Action, task.Body, task.NumSignalledFailures); res.StatusCode != parade.TaskCompleted { t.Errorf("expected status code: %s, got: %s", parade.TaskCompleted, res.StatusCode) } // read Destination @@ -132,7 +132,7 @@ func TestTouch(t *testing.T) { Action: TouchAction, Body: &taskBodyStr, } - if res := h.Handle(task.Action, task.Body); res.StatusCode != parade.TaskCompleted { + if res := h.Handle(task.Action, task.Body, task.NumSignalledFailures); res.StatusCode != parade.TaskCompleted { t.Errorf("expected status code: %s, got: %s", parade.TaskCompleted, res.StatusCode) } // read Destination diff --git a/export/tasks_generator.go b/export/tasks_generator.go index 2abad57d352..f86a3a10f0d 100644 --- a/export/tasks_generator.go +++ b/export/tasks_generator.go @@ -35,6 +35,7 @@ type StartData struct { FromCommitRef string `json:"from"` ToCommitRef string `json:"to"` ExportID string `json:"export_id"` + ExportConfig catalog.ExportConfiguration } type CopyData struct { @@ -52,9 +53,10 @@ type SuccessData struct { } type FinishData struct { - Repo string `json:"repo"` - Branch string `json:"branch"` - CommitRef string `json:"commitRef"` + Repo string `json:"repo"` + Branch string `json:"branch"` + CommitRef string `json:"commitRef"` + StatusPath string `json:"status_path"` } // Returns the "dirname" of path: everything up to the last "/" (excluding that slash). If @@ -260,7 +262,7 @@ type TasksGenerator struct { successTasksGenerator SuccessTasksTreeGenerator } -func GetStartTasks(repo, branch, fromCommitRef, toCommitRef, exportID string) ([]parade.TaskData, error) { +func GetStartTasks(repo, branch, fromCommitRef, toCommitRef, exportID string, config catalog.ExportConfiguration) ([]parade.TaskData, error) { one, zero := 1, 0 data := StartData{ Repo: repo, @@ -268,6 +270,7 @@ func GetStartTasks(repo, branch, fromCommitRef, toCommitRef, exportID string) ([ FromCommitRef: fromCommitRef, ToCommitRef: toCommitRef, ExportID: exportID, + ExportConfig: config, } body, err := json.Marshal(data) if err != nil { diff --git a/parade/action_manager.go b/parade/action_manager.go index 7903721eb93..a0c0e3c5d01 100644 --- a/parade/action_manager.go +++ b/parade/action_manager.go @@ -151,7 +151,7 @@ func (a *workerPool) start() { workerID := uuid.New() defer a.wg.Done() for task := range a.ch { - res := a.actor.Handle(task.Action, task.Body) + res := a.actor.Handle(task.Action, task.Body, task.NumSignalledFailures) err := a.parade.ReturnTask(task.ID, task.Token, res.Status, res.StatusCode) if err != nil { logging.Default().WithFields(logging.Fields{ diff --git a/parade/action_manager_test.go b/parade/action_manager_test.go index 2ed16ae5f9e..2ef7a11e4aa 100644 --- a/parade/action_manager_test.go +++ b/parade/action_manager_test.go @@ -1,11 +1,12 @@ package parade_test import ( - "github.com/treeverse/lakefs/parade" "strconv" "sync/atomic" "testing" "time" + + "github.com/treeverse/lakefs/parade" ) type ownTaskResult struct { @@ -51,7 +52,7 @@ type mockHandler struct { handleCalled int32 } -func (m *mockHandler) Handle(_ string, _ *string) parade.ActorResult { +func (m *mockHandler) Handle(_ string, _ *string, _ int) parade.ActorResult { atomic.AddInt32(&m.handleCalled, 1) return parade.ActorResult{} } diff --git a/parade/actor.go b/parade/actor.go index ac736e00821..5f3b971cd58 100644 --- a/parade/actor.go +++ b/parade/actor.go @@ -8,7 +8,7 @@ type ActorResult struct { // Actor handles an action or a group of actions type Actor interface { // Handle performs actions with the given body and return the ActorResult - Handle(action string, body *string) ActorResult + Handle(action string, body *string, signalledErrors int) ActorResult // Actions returns the list of actions that could be performed by the Actor Actions() []string // ActorID returns the ID of the actor