Skip to content

Commit

Permalink
[CR] Refactor merge & commit hooks
Browse files Browse the repository at this point in the history
Also extract a smaller interface `cataloger.Exporter`
  • Loading branch information
arielshaqed committed Nov 25, 2020
1 parent 4871e49 commit 40fec4b
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 25 deletions.
1 change: 0 additions & 1 deletion catalog/cataloger.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package catalog

import (
"context"
"io"
"time"

"github.com/treeverse/lakefs/db"
Expand Down
4 changes: 2 additions & 2 deletions export/export.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ import (
"errors"
"fmt"

"github.com/jackc/pgx/v4"
nanoid "github.com/matoous/go-nanoid"

"github.com/treeverse/lakefs/db"
"github.com/treeverse/lakefs/logging"
"github.com/treeverse/lakefs/parade"

Expand Down Expand Up @@ -131,7 +131,7 @@ func ExportBranchRepair(cataloger catalog.Cataloger, repo, branch string) error

func hasContinuousExport(c catalog.Cataloger, repo, branch string) (bool, error) {
exportConfiguration, err := c.GetExportConfigurationForBranch(repo, branch)
if errors.Is(err, pgx.ErrNoRows) {
if errors.Is(err, db.ErrNotFound) {
return false, nil
}
if err != nil {
Expand Down
37 changes: 15 additions & 22 deletions export/export_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func (h *Handler) start(body *string) error {
return h.generateTasks(startData, startData.ExportConfig, &finishBodyStr, repo.StorageNamespace)
}

func (h *Handler) generateTasks(startData StartData, config catalog.ExportConfiguration, finishBodyStr *string, storageNamespace string) error {
func (h *Handler) generateTasks(startData StartData, config catalog.Cataloger, finishBodyStr *string, storageNamespace string) error {
tasksGenerator := NewTasksGenerator(startData.ExportID, config.Path, getGenerateSuccess(config.LastKeysInPrefixRegexp), finishBodyStr, storageNamespace)
var diffs catalog.Differences
var err error
Expand Down Expand Up @@ -298,41 +298,34 @@ func (h *Handler) ActorID() parade.ActorID {
return actorName
}

// exportCommitHook is a cataloger PostCommit hook for continuous export.
func (h *Handler) exportCommitHook(ctx context.Context, _ db.Tx, repo, branch string, log *catalog.CommitLog) error {
l := logging.Default().
WithFields(logging.Fields{"repo": repo, "branch": branch, "message": log.Message, "at": log.CreationDate.String()})
isContinuous, err := hasContinuousExport(h.cataloger, repo, branch)
func startExport(l logging.Logger, p parade.Parade, c catalog.Cataloger, op interface{}, repo, branch string) error {
isContinuous, err := hasContinuousExport(c, repo, branch)
if err != nil {
// FAIL this commit: if we were meant to export it and did not then in practice
// there was no commit.
return fmt.Errorf("check continuous export for commit %+v: %w", *log, err)
return fmt.Errorf("check continuous export for %+v: %w", op, err)
}
if !isContinuous {
return nil
}
exportID, err := ExportBranchStart(h.parade, h.cataloger, repo, branch)
exportID, err := ExportBranchStart(p, c, repo, branch)
l.WithField("export_id", exportID).Info("continuous export started")
if errors.Is(err, ErrExportInProgress) {
err = nil
}
return err
}

// exportCommitHook is a cataloger PostCommit hook for continuous export.
func (h *Handler) exportCommitHook(ctx context.Context, _ db.Tx, repo, branch string, log *catalog.CommitLog) error {
l := logging.Default().
WithFields(logging.Fields{"repo": repo, "branch": branch, "message": log.Message, "at": log.CreationDate.String()})
return startExport(l, h.parade, h.cataloger, *log, repo, branch)
}

// exportMergeHook is a cataloger PostMerge hook for continuous export.
func (h *Handler) exportMergeHook(ctx context.Context, _ db.Tx, repo, branch string, merge *catalog.MergeResult) error {
isContinuous, err := hasContinuousExport(h.cataloger, repo, branch)
if err != nil {
// FAIL this merge: if we were meant to export it and did not then in practice
// there was no merge.
return fmt.Errorf("check continuous export for merge %+v: %w", *merge, err)
}
if !isContinuous {
return nil
}
_, err = ExportBranchStart(h.parade, h.cataloger, repo, branch)
if errors.Is(err, ErrExportInProgress) {
err = nil
}
return err
l := logging.Default().
WithFields(logging.Fields{"repo": repo, "branch": branch, "reference": merge.Reference})
return startExport(l, h.parade, h.cataloger, *merge, repo, branch)
}

0 comments on commit 40fec4b

Please sign in to comment.