diff --git a/catalog/cataloger.go b/catalog/cataloger.go index 6e31f5af719..8c698ba526f 100644 --- a/catalog/cataloger.go +++ b/catalog/cataloger.go @@ -2,6 +2,7 @@ package catalog import ( "context" + "io" "time" "github.com/treeverse/lakefs/db" @@ -144,13 +145,11 @@ type Cataloger interface { // ExportStateCallback returns the new ref, state and message regarding the old ref and state type ExportStateCallback func(oldRef string, state CatalogBranchExportStatus) (newRef string, newState CatalogBranchExportStatus, newMessage *string, err error) -type PostCommitFunc = func(ctx context.Context, tx db.Tx, repo, branch string, commitLog *CommitLog) error -type PostMergeFunc = func(ctx context.Context, tx db.Tx, repo, branch string, mergeResult *MergeResult) error +type PostCommitFunc = func(ctx context.Context, repo, branch string, commitLog *CommitLog) error +type PostMergeFunc = func(ctx context.Context, repo, branch string, mergeResult *MergeResult) error // CatalogerHooks describes the hooks available for some operations on the catalog. Hooks are -// called in a current transaction context; if they return an error the transaction is rolled -// back. Because these transactions are current, the hook can see the effect the operation only -// on the passed transaction. +// called after the transaction ends; if they return an error they do not affect commit/merge. type CatalogerHooks struct { // PostCommit hooks are called at the end of a commit. PostCommit []PostCommitFunc diff --git a/catalog/mvcc/cataloger_commit.go b/catalog/mvcc/cataloger_commit.go index 25ba145f4a6..d2bc0a1299c 100644 --- a/catalog/mvcc/cataloger_commit.go +++ b/catalog/mvcc/cataloger_commit.go @@ -76,20 +76,20 @@ func (c *cataloger) Commit(ctx context.Context, repository, branch string, messa Parents: []string{parentReference}, } - for _, hook := range c.hooks.PostCommit { - err = hook(ctx, tx, repository, branch, commitLog) - if err != nil { - // Roll tx back if a hook failed - return nil, err - } - } - return commitLog, nil }, c.txOpts(ctx)...) if err != nil { return nil, err } - return res.(*catalog.CommitLog), nil + commitLog := res.(*catalog.CommitLog) + for _, hook := range c.Hooks().PostCommit { + anotherErr := hook(ctx, repository, branch, commitLog) + if anotherErr != nil && err == nil { + err = anotherErr + } + } + + return commitLog, nil } func commitUpdateCommittedEntriesWithMaxCommit(tx db.Tx, branchID int64, commitID CommitID) (int64, error) { diff --git a/catalog/mvcc/cataloger_commit_test.go b/catalog/mvcc/cataloger_commit_test.go index 1a2045e6cb3..cffb47111d3 100644 --- a/catalog/mvcc/cataloger_commit_test.go +++ b/catalog/mvcc/cataloger_commit_test.go @@ -12,7 +12,6 @@ import ( "github.com/go-test/deep" "github.com/treeverse/lakefs/catalog" - "github.com/treeverse/lakefs/db" "github.com/treeverse/lakefs/testutil" ) @@ -292,72 +291,41 @@ func TestCataloger_CommitTombstoneShouldNotChangeHistory(t *testing.T) { type CommitData struct { Repo, Branch string - Log CommitLog + Log catalog.CommitLog } // CommitHookLogger - commit hook that will return an error if set by Err. // When no Err is set it will log commit log into Logs. type CommitHookLogger struct { - Err error Commits []CommitData } -func (h *CommitHookLogger) Hook(_ context.Context, _ db.Tx, repo, branch string, log *CommitLog) error { - if h.Err != nil { - return h.Err - } +func (h *CommitHookLogger) Hook(_ context.Context, repo, branch string, log *catalog.CommitLog) error { h.Commits = append(h.Commits, CommitData{Repo: repo, Branch: branch, Log: *log}) return nil } func TestCataloger_CommitHooks(t *testing.T) { - errHookFailed := errors.New("for testing") - tests := []struct { - name string - path string - hookErr error - wantErr error - }{ - { - name: "no_block", - hookErr: nil, - }, - { - name: "block", - hookErr: errHookFailed, - }, + ctx := context.Background() + c := testCataloger(t) + + // register hooks (more than one to verify all get called) + hooks := make([]CommitHookLogger, 2) + for i := range hooks { + c.Hooks().AddPostCommit(hooks[i].Hook) } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - ctx := context.Background() - c := testCataloger(t) - // register hooks (more than one to verify all get called) - hooks := []CommitHookLogger{ - {Err: tt.hookErr}, - {Err: tt.hookErr}, - } - for i := range hooks { - c.Hooks().AddPostCommit(hooks[i].Hook) - } + repository := testCatalogerRepo(t, ctx, c, "repository", "master") + _ = testCatalogerCreateEntry(t, ctx, c, repository, catalog.DefaultBranchName, "/file1", nil, "") - repository := testCatalogerRepo(t, ctx, c, "repository", "master") - _ = testCatalogerCreateEntry(t, ctx, c, repository, catalog.DefaultBranchName, "/file1", nil, "") + commitLog, err := c.Commit(ctx, repository, "master", "commit "+t.Name(), "tester", catalog.Metadata{"foo": "bar"}) + if err != nil { + t.Fatalf("Commit err=%s", err) + } - commitLog, err := c.Commit(ctx, repository, "master", "commit "+t.Name(), "tester", catalog.Metadata{"foo": "bar"}) - // check that hook err is the commit error - if !errors.Is(tt.hookErr, err) { - t.Fatalf("Commit err=%s, expected=%s", err, tt.hookErr) - } - // on successful commit the commit log should be found on hook's logs - if err != nil { - return - } - for i := range hooks { - if diffs := deep.Equal(hooks[i].Commits, []CommitData{{Repo: repository, Branch: "master", Log: *commitLog}}); diffs != nil { - t.Errorf("hook %d: unexpected commit logs: %s", i, diffs) - } - } - }) + for i := range hooks { + if diffs := deep.Equal(hooks[i].Commits, []CommitData{{Repo: repository, Branch: "master", Log: *commitLog}}); diffs != nil { + t.Errorf("hook %d: unexpected commit logs: %s", i, diffs) + } } } diff --git a/catalog/mvcc/cataloger_merge.go b/catalog/mvcc/cataloger_merge.go index 3b669c04d8a..8fd91a1895f 100644 --- a/catalog/mvcc/cataloger_merge.go +++ b/catalog/mvcc/cataloger_merge.go @@ -89,15 +89,18 @@ func (c *cataloger) Merge(ctx context.Context, repository, leftBranch, rightBran return nil, err } mergeResult.Reference = MakeReference(rightBranch, nextCommitID) - for _, hook := range c.hooks.PostMerge { - err = hook(ctx, tx, repository, rightBranch, mergeResult) - if err != nil { - // Roll tx back if a hook failed - return nil, err - } - } return nil, nil }, c.txOpts(ctx, db.ReadCommitted())...) + + if err == nil { + for _, hook := range c.Hooks().PostMerge { + anotherErr := hook(ctx, repository, rightBranch, mergeResult) + if anotherErr != nil && err == nil { + err = anotherErr + } + } + } + return mergeResult, err } diff --git a/catalog/mvcc/cataloger_merge_test.go b/catalog/mvcc/cataloger_merge_test.go index 11d73b2908b..dead34b702c 100644 --- a/catalog/mvcc/cataloger_merge_test.go +++ b/catalog/mvcc/cataloger_merge_test.go @@ -8,7 +8,6 @@ import ( "github.com/go-test/deep" "github.com/treeverse/lakefs/catalog" - "github.com/treeverse/lakefs/db" "github.com/treeverse/lakefs/testutil" ) @@ -139,8 +138,12 @@ func TestCataloger_Merge_FromParentConflicts(t *testing.T) { if !errors.Is(err, catalog.ErrConflictFound) { t.Errorf("Merge err = %s, expected conflict with err = %s", err, catalog.ErrConflictFound) } - if res.Reference != "" { - t.Errorf("Merge reference = %s, expected to be empty", res.Reference) + if res == nil { + t.Errorf("Merge returned nil, err %s", err) + } else { + if res.Reference != "" { + t.Errorf("Merge reference = %s, expected to be empty", res.Reference) + } } } @@ -1195,7 +1198,7 @@ type MergeHookLogger struct { Merges []MergeData } -func (h *MergeHookLogger) Hook(_ context.Context, _ db.Tx, repo, branch string, result *MergeResult) error { +func (h *MergeHookLogger) Hook(_ context.Context, repo, branch string, result *MergeResult) error { if h.Err != nil { return h.Err } diff --git a/export/export_handler.go b/export/export_handler.go index 5999ba09c35..23ede9b9c20 100644 --- a/export/export_handler.go +++ b/export/export_handler.go @@ -11,7 +11,6 @@ import ( "github.com/treeverse/lakefs/block" "github.com/treeverse/lakefs/catalog" - "github.com/treeverse/lakefs/db" "github.com/treeverse/lakefs/logging" "github.com/treeverse/lakefs/parade" ) @@ -339,14 +338,14 @@ func startExport(l logging.Logger, p parade.Parade, c catalog.Cataloger, op inte } // exportCommitHook is a cataloger PostCommit hook for continuous export. -func (h *Handler) exportCommitHook(ctx context.Context, _ db.Tx, repo, branch string, log *catalog.CommitLog) error { +func (h *Handler) exportCommitHook(ctx context.Context, repo, branch string, log *catalog.CommitLog) error { l := logging.Default(). WithFields(logging.Fields{"repo": repo, "branch": branch, "message": log.Message, "at": log.CreationDate.String()}) return startExport(l, h.parade, h.cataloger, *log, repo, branch) } // exportMergeHook is a cataloger PostMerge hook for continuous export. -func (h *Handler) exportMergeHook(ctx context.Context, _ db.Tx, repo, branch string, merge *catalog.MergeResult) error { +func (h *Handler) exportMergeHook(ctx context.Context, repo, branch string, merge *catalog.MergeResult) error { l := logging.Default(). WithFields(logging.Fields{"repo": repo, "branch": branch, "reference": merge.Reference}) return startExport(l, h.parade, h.cataloger, *merge, repo, branch)