Skip to content

Commit

Permalink
Run hooks outside of transactions
Browse files Browse the repository at this point in the history
Catalog methods do not support transactions anyway.  So there is no way for a hook to use
its transaction.  OTOH, running inside the transaction means all catalog methods that the
hook calls will NOT see the commit or merge.  So fix that.
  • Loading branch information
arielshaqed committed Nov 25, 2020
1 parent 43e6261 commit 5e77681
Show file tree
Hide file tree
Showing 6 changed files with 51 additions and 79 deletions.
9 changes: 4 additions & 5 deletions catalog/cataloger.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package catalog

import (
"context"
"io"
"time"

"github.com/treeverse/lakefs/db"
Expand Down Expand Up @@ -144,13 +145,11 @@ type Cataloger interface {
// ExportStateCallback returns the new ref, state and message regarding the old ref and state
type ExportStateCallback func(oldRef string, state CatalogBranchExportStatus) (newRef string, newState CatalogBranchExportStatus, newMessage *string, err error)

type PostCommitFunc = func(ctx context.Context, tx db.Tx, repo, branch string, commitLog *CommitLog) error
type PostMergeFunc = func(ctx context.Context, tx db.Tx, repo, branch string, mergeResult *MergeResult) error
type PostCommitFunc = func(ctx context.Context, repo, branch string, commitLog *CommitLog) error
type PostMergeFunc = func(ctx context.Context, repo, branch string, mergeResult *MergeResult) error

// CatalogerHooks describes the hooks available for some operations on the catalog. Hooks are
// called in a current transaction context; if they return an error the transaction is rolled
// back. Because these transactions are current, the hook can see the effect the operation only
// on the passed transaction.
// called after the transaction ends; if they return an error they do not affect commit/merge.
type CatalogerHooks struct {
// PostCommit hooks are called at the end of a commit.
PostCommit []PostCommitFunc
Expand Down
18 changes: 9 additions & 9 deletions catalog/mvcc/cataloger_commit.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,20 +76,20 @@ func (c *cataloger) Commit(ctx context.Context, repository, branch string, messa
Parents: []string{parentReference},
}

for _, hook := range c.hooks.PostCommit {
err = hook(ctx, tx, repository, branch, commitLog)
if err != nil {
// Roll tx back if a hook failed
return nil, err
}
}

return commitLog, nil
}, c.txOpts(ctx)...)
if err != nil {
return nil, err
}
return res.(*catalog.CommitLog), nil
commitLog := res.(*catalog.CommitLog)
for _, hook := range c.Hooks().PostCommit {
anotherErr := hook(ctx, repository, branch, commitLog)
if anotherErr != nil && err == nil {
err = anotherErr
}
}

return commitLog, nil
}

func commitUpdateCommittedEntriesWithMaxCommit(tx db.Tx, branchID int64, commitID CommitID) (int64, error) {
Expand Down
70 changes: 19 additions & 51 deletions catalog/mvcc/cataloger_commit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"github.com/go-test/deep"

"github.com/treeverse/lakefs/catalog"
"github.com/treeverse/lakefs/db"
"github.com/treeverse/lakefs/testutil"
)

Expand Down Expand Up @@ -292,72 +291,41 @@ func TestCataloger_CommitTombstoneShouldNotChangeHistory(t *testing.T) {

type CommitData struct {
Repo, Branch string
Log CommitLog
Log catalog.CommitLog
}

// CommitHookLogger - commit hook that will return an error if set by Err.
// When no Err is set it will log commit log into Logs.
type CommitHookLogger struct {
Err error
Commits []CommitData
}

func (h *CommitHookLogger) Hook(_ context.Context, _ db.Tx, repo, branch string, log *CommitLog) error {
if h.Err != nil {
return h.Err
}
func (h *CommitHookLogger) Hook(_ context.Context, repo, branch string, log *catalog.CommitLog) error {
h.Commits = append(h.Commits, CommitData{Repo: repo, Branch: branch, Log: *log})
return nil
}

func TestCataloger_CommitHooks(t *testing.T) {
errHookFailed := errors.New("for testing")
tests := []struct {
name string
path string
hookErr error
wantErr error
}{
{
name: "no_block",
hookErr: nil,
},
{
name: "block",
hookErr: errHookFailed,
},
ctx := context.Background()
c := testCataloger(t)

// register hooks (more than one to verify all get called)
hooks := make([]CommitHookLogger, 2)
for i := range hooks {
c.Hooks().AddPostCommit(hooks[i].Hook)
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ctx := context.Background()
c := testCataloger(t)

// register hooks (more than one to verify all get called)
hooks := []CommitHookLogger{
{Err: tt.hookErr},
{Err: tt.hookErr},
}
for i := range hooks {
c.Hooks().AddPostCommit(hooks[i].Hook)
}
repository := testCatalogerRepo(t, ctx, c, "repository", "master")
_ = testCatalogerCreateEntry(t, ctx, c, repository, catalog.DefaultBranchName, "/file1", nil, "")

repository := testCatalogerRepo(t, ctx, c, "repository", "master")
_ = testCatalogerCreateEntry(t, ctx, c, repository, catalog.DefaultBranchName, "/file1", nil, "")
commitLog, err := c.Commit(ctx, repository, "master", "commit "+t.Name(), "tester", catalog.Metadata{"foo": "bar"})
if err != nil {
t.Fatalf("Commit err=%s", err)
}

commitLog, err := c.Commit(ctx, repository, "master", "commit "+t.Name(), "tester", catalog.Metadata{"foo": "bar"})
// check that hook err is the commit error
if !errors.Is(tt.hookErr, err) {
t.Fatalf("Commit err=%s, expected=%s", err, tt.hookErr)
}
// on successful commit the commit log should be found on hook's logs
if err != nil {
return
}
for i := range hooks {
if diffs := deep.Equal(hooks[i].Commits, []CommitData{{Repo: repository, Branch: "master", Log: *commitLog}}); diffs != nil {
t.Errorf("hook %d: unexpected commit logs: %s", i, diffs)
}
}
})
for i := range hooks {
if diffs := deep.Equal(hooks[i].Commits, []CommitData{{Repo: repository, Branch: "master", Log: *commitLog}}); diffs != nil {
t.Errorf("hook %d: unexpected commit logs: %s", i, diffs)
}
}
}
17 changes: 10 additions & 7 deletions catalog/mvcc/cataloger_merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,15 +89,18 @@ func (c *cataloger) Merge(ctx context.Context, repository, leftBranch, rightBran
return nil, err
}
mergeResult.Reference = MakeReference(rightBranch, nextCommitID)
for _, hook := range c.hooks.PostMerge {
err = hook(ctx, tx, repository, rightBranch, mergeResult)
if err != nil {
// Roll tx back if a hook failed
return nil, err
}
}
return nil, nil
}, c.txOpts(ctx, db.ReadCommitted())...)

if err == nil {
for _, hook := range c.Hooks().PostMerge {
anotherErr := hook(ctx, repository, rightBranch, mergeResult)
if anotherErr != nil && err == nil {
err = anotherErr
}
}
}

return mergeResult, err
}

Expand Down
11 changes: 7 additions & 4 deletions catalog/mvcc/cataloger_merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (

"github.com/go-test/deep"
"github.com/treeverse/lakefs/catalog"
"github.com/treeverse/lakefs/db"
"github.com/treeverse/lakefs/testutil"
)

Expand Down Expand Up @@ -139,8 +138,12 @@ func TestCataloger_Merge_FromParentConflicts(t *testing.T) {
if !errors.Is(err, catalog.ErrConflictFound) {
t.Errorf("Merge err = %s, expected conflict with err = %s", err, catalog.ErrConflictFound)
}
if res.Reference != "" {
t.Errorf("Merge reference = %s, expected to be empty", res.Reference)
if res == nil {
t.Errorf("Merge returned nil, err %s", err)
} else {
if res.Reference != "" {
t.Errorf("Merge reference = %s, expected to be empty", res.Reference)
}
}
}

Expand Down Expand Up @@ -1195,7 +1198,7 @@ type MergeHookLogger struct {
Merges []MergeData
}

func (h *MergeHookLogger) Hook(_ context.Context, _ db.Tx, repo, branch string, result *MergeResult) error {
func (h *MergeHookLogger) Hook(_ context.Context, repo, branch string, result *MergeResult) error {
if h.Err != nil {
return h.Err
}
Expand Down
5 changes: 2 additions & 3 deletions export/export_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (

"github.com/treeverse/lakefs/block"
"github.com/treeverse/lakefs/catalog"
"github.com/treeverse/lakefs/db"
"github.com/treeverse/lakefs/logging"
"github.com/treeverse/lakefs/parade"
)
Expand Down Expand Up @@ -339,14 +338,14 @@ func startExport(l logging.Logger, p parade.Parade, c catalog.Cataloger, op inte
}

// exportCommitHook is a cataloger PostCommit hook for continuous export.
func (h *Handler) exportCommitHook(ctx context.Context, _ db.Tx, repo, branch string, log *catalog.CommitLog) error {
func (h *Handler) exportCommitHook(ctx context.Context, repo, branch string, log *catalog.CommitLog) error {
l := logging.Default().
WithFields(logging.Fields{"repo": repo, "branch": branch, "message": log.Message, "at": log.CreationDate.String()})
return startExport(l, h.parade, h.cataloger, *log, repo, branch)
}

// exportMergeHook is a cataloger PostMerge hook for continuous export.
func (h *Handler) exportMergeHook(ctx context.Context, _ db.Tx, repo, branch string, merge *catalog.MergeResult) error {
func (h *Handler) exportMergeHook(ctx context.Context, repo, branch string, merge *catalog.MergeResult) error {
l := logging.Default().
WithFields(logging.Fields{"repo": repo, "branch": branch, "reference": merge.Reference})
return startExport(l, h.parade, h.cataloger, *merge, repo, branch)
Expand Down

0 comments on commit 5e77681

Please sign in to comment.