Skip to content

Commit

Permalink
Merge pull request #949 from treeverse/feature/534-continuous
Browse files Browse the repository at this point in the history
Add continuous export
  • Loading branch information
arielshaqed authored Nov 26, 2020
2 parents d0b734b + eeeed6d commit 84a52ee
Show file tree
Hide file tree
Showing 13 changed files with 334 additions and 173 deletions.
39 changes: 31 additions & 8 deletions catalog/cataloger.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"io"
"time"

"github.com/treeverse/lakefs/db"
"github.com/lib/pq"
)

const (
Expand Down Expand Up @@ -145,24 +145,47 @@ type Cataloger interface {
// ExportStateCallback returns the new ref, state and message regarding the old ref and state
type ExportStateCallback func(oldRef string, state CatalogBranchExportStatus) (newRef string, newState CatalogBranchExportStatus, newMessage *string, err error)

// ExportConfiguration describes the export configuration of a branch, as passed on wire, used
// internally, and stored in DB.
type ExportConfiguration struct {
Path string `db:"export_path" json:"export_path"`
StatusPath string `db:"export_status_path" json:"export_status_path"`
LastKeysInPrefixRegexp pq.StringArray `db:"last_keys_in_prefix_regexp" json:"last_keys_in_prefix_regexp"`
IsContinuous bool `db:"continuous" json:"is_continuous"`
}

// ExportConfigurationForBranch describes how to export BranchID. It is stored in the database.
// Unfortunately golang sql doesn't know about embedded structs, so you get a useless copy of
// ExportConfiguration embedded here.
type ExportConfigurationForBranch struct {
Repository string `db:"repository"`
Branch string `db:"branch"`

Path string `db:"export_path"`
StatusPath string `db:"export_status_path"`
LastKeysInPrefixRegexp pq.StringArray `db:"last_keys_in_prefix_regexp"`
IsContinuous bool `db:"continuous"`
}

type PostCommitFunc func(ctx context.Context, repo, branch string, commitLog CommitLog) error
type PostMergeFunc func(ctx context.Context, repo, branch string, mergeResult MergeResult) error

// CatalogerHooks describes the hooks available for some operations on the catalog. Hooks are
// called in a current transaction context; if they return an error the transaction is rolled
// back. Because these transactions are current, the hook can see the effect the operation only
// on the passed transaction.
// called after the transaction ends; if they return an error they do not affect commit/merge.
type CatalogerHooks struct {
// PostCommit hooks are called at the end of a commit.
PostCommit []func(ctx context.Context, tx db.Tx, commitLog *CommitLog) error
PostCommit []PostCommitFunc

// PostMerge hooks are called at the end of a merge.
PostMerge []func(ctx context.Context, tx db.Tx, mergeResult *MergeResult) error
PostMerge []PostMergeFunc
}

func (h *CatalogerHooks) AddPostCommit(f func(context.Context, db.Tx, *CommitLog) error) *CatalogerHooks {
func (h *CatalogerHooks) AddPostCommit(f PostCommitFunc) *CatalogerHooks {
h.PostCommit = append(h.PostCommit, f)
return h
}

func (h *CatalogerHooks) AddPostMerge(f func(context.Context, db.Tx, *MergeResult) error) *CatalogerHooks {
func (h *CatalogerHooks) AddPostMerge(f PostMergeFunc) *CatalogerHooks {
h.PostMerge = append(h.PostMerge, f)
return h
}
35 changes: 3 additions & 32 deletions catalog/export.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,32 +4,8 @@ import (
"database/sql/driver"
"fmt"
"strings"

"github.com/lib/pq"
)

// ExportConfiguration describes the export configuration of a branch, as passed on wire, used
// internally, and stored in DB.
type ExportConfiguration struct {
Path string `db:"export_path" json:"export_path"`
StatusPath string `db:"export_status_path" json:"export_status_path"`
LastKeysInPrefixRegexp pq.StringArray `db:"last_keys_in_prefix_regexp" json:"last_keys_in_prefix_regexp"`
IsContinuous bool `db:"continuous" json:"is_continuous"`
}

// ExportConfigurationForBranch describes how to export BranchID. It is stored in the database.
// Unfortunately golang sql doesn't know about embedded structs, so you get a useless copy of
// ExportConfiguration embedded here.
type ExportConfigurationForBranch struct {
Repository string `db:"repository"`
Branch string `db:"branch"`

Path string `db:"export_path"`
StatusPath string `db:"export_status_path"`
LastKeysInPrefixRegexp pq.StringArray `db:"last_keys_in_prefix_regexp"`
IsContinuous bool `db:"continuous"`
}

type CatalogBranchExportStatus string

const (
Expand All @@ -42,15 +18,10 @@ const (

// ExportStatus describes the current export status of a branch, as passed on wire, used
// internally, and stored in DB.
type ExportStatus struct {
CurrentRef string `db:"current_ref"`
State CatalogBranchExportStatus
}

type ExportState struct {
CurrentRef string
State CatalogBranchExportStatus
ErrorMessage *string
CurrentRef string `db:"current_ref"`
State CatalogBranchExportStatus `db:"state"`
ErrorMessage *string `db:"error_message"`
}

// nolint: stylecheck
Expand Down
18 changes: 9 additions & 9 deletions catalog/mvcc/cataloger_commit.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,20 +76,20 @@ func (c *cataloger) Commit(ctx context.Context, repository, branch string, messa
Parents: []string{parentReference},
}

for _, hook := range c.hooks.PostCommit {
err = hook(ctx, tx, commitLog)
if err != nil {
// Roll tx back if a hook failed
return nil, err
}
}

return commitLog, nil
}, c.txOpts(ctx)...)
if err != nil {
return nil, err
}
return res.(*catalog.CommitLog), nil
commitLog := res.(*catalog.CommitLog)
for _, hook := range c.Hooks().PostCommit {
anotherErr := hook(ctx, repository, branch, *commitLog)
if anotherErr != nil && err == nil {
err = anotherErr
}
}

return commitLog, nil
}

func commitUpdateCommittedEntriesWithMaxCommit(tx db.Tx, branchID int64, commitID CommitID) (int64, error) {
Expand Down
86 changes: 30 additions & 56 deletions catalog/mvcc/cataloger_commit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,14 @@ import (
"context"
"errors"
"fmt"
"reflect"
"strconv"
"strings"
"testing"
"time"

"github.com/davecgh/go-spew/spew"
"github.com/go-test/deep"

"github.com/treeverse/lakefs/catalog"
"github.com/treeverse/lakefs/db"
"github.com/treeverse/lakefs/testutil"
)

Expand Down Expand Up @@ -112,8 +111,8 @@ func TestCataloger_Commit(t *testing.T) {
got.CreationDate = tt.want.CreationDate
}
}
if !reflect.DeepEqual(got, tt.want) {
t.Errorf("Commit() got = %s, want = %s", spew.Sdump(got), spew.Sdump(tt.want))
if diffs := deep.Equal(got, tt.want); diffs != nil {
t.Errorf("unexpected Commit(): %s", diffs)
}
})
}
Expand Down Expand Up @@ -290,69 +289,44 @@ func TestCataloger_CommitTombstoneShouldNotChangeHistory(t *testing.T) {
}
}

type CommitData struct {
Repo string
Branch string
Log catalog.CommitLog
}

// CommitHookLogger - commit hook that will return an error if set by Err.
// When no Err is set it will log commit log into Logs.
type CommitHookLogger struct {
Err error
Logs []*catalog.CommitLog
Commits []CommitData
}

func (h *CommitHookLogger) Hook(_ context.Context, _ db.Tx, log *catalog.CommitLog) error {
if h.Err != nil {
return h.Err
}
h.Logs = append(h.Logs, log)
func (h *CommitHookLogger) Hook(_ context.Context, repo, branch string, log catalog.CommitLog) error {
h.Commits = append(h.Commits, CommitData{Repo: repo, Branch: branch, Log: log})
return nil
}

func TestCataloger_CommitHooks(t *testing.T) {
errHookFailed := errors.New("for testing")
tests := []struct {
name string
path string
hookErr error
wantErr error
}{
{
name: "no_block",
hookErr: nil,
},
{
name: "block",
hookErr: errHookFailed,
},
ctx := context.Background()
c := testCataloger(t)

// register hooks (more than one to verify all get called)
hooks := make([]CommitHookLogger, 2)
for i := range hooks {
c.Hooks().AddPostCommit(hooks[i].Hook)
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ctx := context.Background()
c := testCataloger(t)

// register hooks (more than one to verify all get called)
hooks := []CommitHookLogger{
{Err: tt.hookErr},
{Err: tt.hookErr},
}
for i := range hooks {
c.Hooks().AddPostCommit(hooks[i].Hook)
}
repository := testCatalogerRepo(t, ctx, c, "repository", "master")
_ = testCatalogerCreateEntry(t, ctx, c, repository, catalog.DefaultBranchName, "/file1", nil, "")

repository := testCatalogerRepo(t, ctx, c, "repository", "master")
_ = testCatalogerCreateEntry(t, ctx, c, repository, catalog.DefaultBranchName, "/file1", nil, "")
commitLog, err := c.Commit(ctx, repository, "master", "commit "+t.Name(), "tester", catalog.Metadata{"foo": "bar"})
if err != nil {
t.Fatalf("Commit err=%s", err)
}

commitLog, err := c.Commit(ctx, repository, "master", "commit "+t.Name(), "tester", catalog.Metadata{"foo": "bar"})
// check that hook err is the commit error
if !errors.Is(tt.hookErr, err) {
t.Fatalf("Commit err=%s, expected=%s", err, tt.hookErr)
}
// on successful commit the commit log should be found on hook's logs
if err != nil {
return
}
for i := range hooks {
if len(hooks[i].Logs) != 1 || hooks[i].Logs[0] != commitLog {
t.Errorf("hook %d: expected one commit %+v but got logs: %s", i, commitLog, spew.Sprint(hooks[i].Logs))
}
}
})
for i := range hooks {
if diffs := deep.Equal(hooks[i].Commits, []CommitData{{Repo: repository, Branch: "master", Log: *commitLog}}); diffs != nil {
t.Errorf("hook %d: unexpected commit logs: %s", i, diffs)
}
}
}
25 changes: 25 additions & 0 deletions catalog/mvcc/cataloger_diff_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,31 @@ import (
"github.com/treeverse/lakefs/testutil"
)

func TestCataloger_DiffEmpty(t *testing.T) {
ctx := context.Background()
c := testCataloger(t)
repository := testCatalogerRepo(t, ctx, c, "repo", "master")

// create N files and commit
commitChanges := func(n int, msg, branch string) {
for i := 0; i < n; i++ {
testCatalogerCreateEntry(t, ctx, c, repository, branch, "/file"+strconv.Itoa(i), nil, branch)
}
_, err := c.Commit(ctx, repository, branch, msg, "tester", nil)
testutil.MustDo(t, msg, err)
}
commitChanges(10, "Changes on master", "master")

res, hasMore, err := c.Diff(ctx, repository, "master", "master", catalog.DiffParams{Limit: 10})
testutil.MustDo(t, "Diff", err)
if len(res) != 0 {
t.Errorf("Diff: got %+v but expected nothing", res)
}
if hasMore {
t.Errorf("Diff: got *more* diffs but expected nothing")
}
}

func TestCataloger_Diff(t *testing.T) {
ctx := context.Background()
c := testCataloger(t)
Expand Down
Loading

0 comments on commit 84a52ee

Please sign in to comment.