Skip to content

Commit

Permalink
Continuous export: start a new export after each commit or merge
Browse files Browse the repository at this point in the history
Hook onto commits and merges to start an export.  This export will attempt to export to the
branch tip.
  • Loading branch information
arielshaqed committed Nov 25, 2020
1 parent cfae470 commit a4b3ec0
Show file tree
Hide file tree
Showing 7 changed files with 99 additions and 29 deletions.
11 changes: 7 additions & 4 deletions catalog/cataloger.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,24 +145,27 @@ type Cataloger interface {
// ExportStateCallback returns the new ref, state and message regarding the old ref and state
type ExportStateCallback func(oldRef string, state CatalogBranchExportStatus) (newRef string, newState CatalogBranchExportStatus, newMessage *string, err error)

type PostCommitFunc = func(ctx context.Context, tx db.Tx, repo, branch string, commitLog *CommitLog) error
type PostMergeFunc = func(ctx context.Context, tx db.Tx, repo, branch string, mergeResult *MergeResult) error

// CatalogerHooks describes the hooks available for some operations on the catalog. Hooks are
// called in a current transaction context; if they return an error the transaction is rolled
// back. Because these transactions are current, the hook can see the effect the operation only
// on the passed transaction.
type CatalogerHooks struct {
// PostCommit hooks are called at the end of a commit.
PostCommit []func(ctx context.Context, tx db.Tx, commitLog *CommitLog) error
PostCommit []PostCommitFunc

// PostMerge hooks are called at the end of a merge.
PostMerge []func(ctx context.Context, tx db.Tx, mergeResult *MergeResult) error
PostMerge []PostMergeFunc
}

func (h *CatalogerHooks) AddPostCommit(f func(context.Context, db.Tx, *CommitLog) error) *CatalogerHooks {
func (h *CatalogerHooks) AddPostCommit(f PostCommitFunc) *CatalogerHooks {
h.PostCommit = append(h.PostCommit, f)
return h
}

func (h *CatalogerHooks) AddPostMerge(f func(context.Context, db.Tx, *MergeResult) error) *CatalogerHooks {
func (h *CatalogerHooks) AddPostMerge(f PostMergeFunc) *CatalogerHooks {
h.PostMerge = append(h.PostMerge, f)
return h
}
2 changes: 1 addition & 1 deletion catalog/mvcc/cataloger_commit.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func (c *cataloger) Commit(ctx context.Context, repository, branch string, messa
}

for _, hook := range c.hooks.PostCommit {
err = hook(ctx, tx, commitLog)
err = hook(ctx, tx, repository, branch, commitLog)
if err != nil {
// Roll tx back if a hook failed
return nil, err
Expand Down
25 changes: 15 additions & 10 deletions catalog/mvcc/cataloger_commit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@ import (
"context"
"errors"
"fmt"
"reflect"
"strconv"
"strings"
"testing"
"time"

"github.com/davecgh/go-spew/spew"
"github.com/go-test/deep"

"github.com/treeverse/lakefs/catalog"
"github.com/treeverse/lakefs/db"
"github.com/treeverse/lakefs/testutil"
Expand Down Expand Up @@ -112,8 +112,8 @@ func TestCataloger_Commit(t *testing.T) {
got.CreationDate = tt.want.CreationDate
}
}
if !reflect.DeepEqual(got, tt.want) {
t.Errorf("Commit() got = %s, want = %s", spew.Sdump(got), spew.Sdump(tt.want))
if diffs := deep.Equal(got, tt.want); diffs != nil {
t.Errorf("unexpected Commit(): %s", diffs)
}
})
}
Expand Down Expand Up @@ -290,18 +290,23 @@ func TestCataloger_CommitTombstoneShouldNotChangeHistory(t *testing.T) {
}
}

type CommitData struct {
Repo, Branch string
Log CommitLog
}

// CommitHookLogger - commit hook that will return an error if set by Err.
// When no Err is set it will log commit log into Logs.
type CommitHookLogger struct {
Err error
Logs []*catalog.CommitLog
Err error
Commits []CommitData
}

func (h *CommitHookLogger) Hook(_ context.Context, _ db.Tx, log *catalog.CommitLog) error {
func (h *CommitHookLogger) Hook(_ context.Context, _ db.Tx, repo, branch string, log *CommitLog) error {
if h.Err != nil {
return h.Err
}
h.Logs = append(h.Logs, log)
h.Commits = append(h.Commits, CommitData{Repo: repo, Branch: branch, Log: *log})
return nil
}

Expand Down Expand Up @@ -349,8 +354,8 @@ func TestCataloger_CommitHooks(t *testing.T) {
return
}
for i := range hooks {
if len(hooks[i].Logs) != 1 || hooks[i].Logs[0] != commitLog {
t.Errorf("hook %d: expected one commit %+v but got logs: %s", i, commitLog, spew.Sprint(hooks[i].Logs))
if diffs := deep.Equal(hooks[i].Commits, []CommitData{{Repo: repository, Branch: "master", Log: *commitLog}}); diffs != nil {
t.Errorf("hook %d: unexpected commit logs: %s", i, diffs)
}
}
})
Expand Down
2 changes: 1 addition & 1 deletion catalog/mvcc/cataloger_merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func (c *cataloger) Merge(ctx context.Context, repository, leftBranch, rightBran
}
mergeResult.Reference = MakeReference(rightBranch, nextCommitID)
for _, hook := range c.hooks.PostMerge {
err = hook(ctx, tx, mergeResult)
err = hook(ctx, tx, repository, rightBranch, mergeResult)
if err != nil {
// Roll tx back if a hook failed
return nil, err
Expand Down
21 changes: 15 additions & 6 deletions catalog/mvcc/cataloger_merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1183,18 +1183,23 @@ func TestCataloger_MergeFromChildAfterMergeFromParent(t *testing.T) {
}
}

type MergeData struct {
Repo, Branch string
Result MergeResult
}

// MergeHookLogger - merge hook that will return an error if set by Err.
// When no Err is set it will log merge log into Logs.
type MergeHookLogger struct {
Err error
Logs []*catalog.MergeResult
Err error
Merges []MergeData
}

func (h *MergeHookLogger) Hook(_ context.Context, _ db.Tx, log *catalog.MergeResult) error {
func (h *MergeHookLogger) Hook(_ context.Context, _ db.Tx, repo, branch string, result *MergeResult) error {
if h.Err != nil {
return h.Err
}
h.Logs = append(h.Logs, log)
h.Merges = append(h.Merges, MergeData{Repo: repo, Branch: branch, Result: *result})
return nil
}

Expand Down Expand Up @@ -1242,9 +1247,13 @@ func TestCataloger_Merge_Hooks(t *testing.T) {
{Path: "/file1"},
})

expected := []*catalog.MergeResult{res}
expected := []MergeData{{
Repo: repository,
Branch: "master",
Result: *res,
}}
for _, hook := range hooks {
if diffs := deep.Equal(expected, hook.Logs); diffs != nil {
if diffs := deep.Equal(expected, hook.Merges); diffs != nil {
t.Error("hook received unexpected merge result: ", diffs)
}
}
Expand Down
20 changes: 16 additions & 4 deletions export/export.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package export

import (
"context"
"db"
"errors"
"fmt"

Expand Down Expand Up @@ -66,13 +67,16 @@ var ErrConflictingRefs = errors.New("conflicting references")
func ExportBranchDone(parade parade.Parade, cataloger catalog.Cataloger, status catalog.CatalogBranchExportStatus, statusMsg *string, repo, branch, commitRef string) error {
if status == catalog.ExportStatusSuccess {
// Start the next export if continuous.
exportConfiguration, err := cataloger.GetExportConfigurationForBranch(repo, branch)
isContinuous, err := hasContinuousExport(cataloger, repo, branch)
if err != nil {
return fmt.Errorf("check whether export configuration is continuous for repo %s branch %s: %w", repo, branch, err)
// Consider branch export failed: it was supposed to be continuous but
// might have stopped. So set an error for the admin to fix before
// re-enabling continuous export.
return err
}
if exportConfiguration.IsContinuous {
if isContinuous {
_, err := ExportBranchStart(parade, cataloger, repo, branch)
if err == ErrExportInProgress {
if errors.Is(err, ErrExportInProgress) {
logging.Default().WithFields(logging.Fields{
"repo": repo,
"branch": branch,
Expand Down Expand Up @@ -107,3 +111,11 @@ func ExportBranchRepair(cataloger catalog.Cataloger, repo, branch string) error
return oldRef, catalog.ExportStatusRepaired, nil, nil
})
}

func hasContinuousExport(c catalog.Cataloger, repo, branch string) (bool, error) {
exportConfiguration, err := c.GetExportConfigurationForBranch(repo, branch)
if err != nil {
return false, fmt.Errorf("check whether export configuration is continuous for repo %s branch %s: %w", repo, branch, err)
}
return exportConfiguration.IsContinuous, nil
}
47 changes: 44 additions & 3 deletions export/export_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,8 @@ import (
"regexp"
"strings"

"github.com/treeverse/lakefs/catalog"

"github.com/treeverse/lakefs/block"
"github.com/treeverse/lakefs/catalog"
"github.com/treeverse/lakefs/logging"
"github.com/treeverse/lakefs/parade"
)
Expand All @@ -25,11 +24,17 @@ type Handler struct {
}

func NewHandler(adapter block.Adapter, cataloger catalog.Cataloger, parade parade.Parade) *Handler {
return &Handler{
ret := &Handler{
adapter: adapter,
cataloger: cataloger,
parade: parade,
}
if cataloger != nil {
hooks := cataloger.Hooks()
hooks.AddPostCommit(ret.exportCommitHook)
hooks.AddPostMerge(ret.exportMergeHook)
}
return ret
}

type TaskBody struct {
Expand Down Expand Up @@ -291,3 +296,39 @@ func (h *Handler) Actions() []string {
func (h *Handler) ActorID() parade.ActorID {
return actorName
}

// exportCommitHook is a cataloger PostCommit hook for continuous export.
func (h *Handler) exportCommitHook(ctx context.Context, _ db.Tx, repo, branch string, log *catalog.CommitLog) error {
isContinuous, err := hasContinuousExport(h.cataloger, repo, branch)
if err != nil {
// FAIL this commit: if we were meant to export it and did not then in practice
// there was no commit.
return fmt.Errorf("check continuous export for commit %+v: %w", *log, err)
}
if !isContinuous {
return nil
}
_, err = ExportBranchStart(h.parade, h.cataloger, repo, branch)
if errors.Is(err, ErrExportInProgress) {
err = nil
}
return err
}

// exportMergeHook is a cataloger PostMerge hook for continuous export.
func (h *Handler) exportMergeHook(ctx context.Context, _ db.Tx, repo, branch string, merge *catalog.MergeResult) error {
isContinuous, err := hasContinuousExport(h.cataloger, repo, branch)
if err != nil {
// FAIL this merge: if we were meant to export it and did not then in practice
// there was no merge.
return fmt.Errorf("check continuous export for merge %+v: %w", *merge, err)
}
if !isContinuous {
return nil
}
_, err = ExportBranchStart(h.parade, h.cataloger, repo, branch)
if errors.Is(err, ErrExportInProgress) {
err = nil
}
return err
}

0 comments on commit a4b3ec0

Please sign in to comment.