Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master' into feature/tiered-storage
Browse files Browse the repository at this point in the history
  • Loading branch information
itaiad200 committed Dec 2, 2020
2 parents e8b8c87 + 83478b7 commit add7902
Show file tree
Hide file tree
Showing 12 changed files with 247 additions and 14 deletions.
12 changes: 8 additions & 4 deletions api/api_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,8 +263,6 @@ func (c *Controller) SetupLakeFSHandler() setupop.SetupLakeFSHandler {
})
}

c.deps.Collector.CollectEvent("global", "init")

username := swag.StringValue(setupReq.User.Username)
var cred *model.Credential
if setupReq.User.Key == nil {
Expand All @@ -276,7 +274,13 @@ func (c *Controller) SetupLakeFSHandler() setupop.SetupLakeFSHandler {
return setupop.NewSetupLakeFSDefault(http.StatusInternalServerError).
WithPayload(&models.Error{Message: err.Error()})
}

metadata, err := c.deps.MetadataManager.Write()
if err != nil {
c.deps.logger.Error("failed to write metadata after setup")
} else {
c.deps.Collector.SetInstallationID(metadata[auth.InstallationIDKeyName])
}
c.deps.Collector.CollectEvent("global", "init")
return setupop.NewSetupLakeFSOK().WithPayload(&models.CredentialsWithSecret{
AccessKeyID: cred.AccessKeyID,
AccessSecretKey: cred.AccessSecretKey,
Expand Down Expand Up @@ -1289,7 +1293,7 @@ func (c *Controller) RevertBranchHandler() branches.RevertBranchHandler {
ctx := c.Context()
switch swag.StringValue(params.Revert.Type) {
case models.RevertCreationTypeCommit:
err = cataloger.RollbackCommit(ctx, params.Repository, params.Revert.Commit)
err = cataloger.RollbackCommit(ctx, params.Repository, params.Branch, params.Revert.Commit)
case models.RevertCreationTypeCommonPrefix:
err = cataloger.ResetEntries(ctx, params.Repository, params.Branch, params.Revert.Path)
case models.RevertCreationTypeReset:
Expand Down
2 changes: 2 additions & 0 deletions api/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ func (m *mockCollector) CollectMetadata(_ *stats.Metadata) {}

func (m *mockCollector) CollectEvent(_, _ string) {}

func (m *mockCollector) SetInstallationID(_ string) {}

func getHandler(t *testing.T, blockstoreType string, opts ...testutil.GetDBOption) (http.Handler, *dependencies) {
conn, handlerDatabaseURI := testutil.GetDB(t, databaseURI, opts...)
var blockAdapter block.Adapter
Expand Down
2 changes: 1 addition & 1 deletion catalog/cataloger.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ type Cataloger interface {
Commit(ctx context.Context, repository, branch string, message string, committer string, metadata Metadata) (*CommitLog, error)
GetCommit(ctx context.Context, repository, reference string) (*CommitLog, error)
ListCommits(ctx context.Context, repository, branch string, fromReference string, limit int) ([]*CommitLog, bool, error)
RollbackCommit(ctx context.Context, repository, reference string) error
RollbackCommit(ctx context.Context, repository, branch string, reference string) error

Diff(ctx context.Context, repository, leftReference string, rightReference string, params DiffParams) (Differences, bool, error)
DiffUncommitted(ctx context.Context, repository, branch string, limit int, after string) (Differences, bool, error)
Expand Down
1 change: 1 addition & 0 deletions catalog/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,5 @@ var (
ErrUnsupportedDelimiter = errors.New("unsupported delimiter")
ErrBadTypeConversion = errors.New("bad type")
ErrExportFailed = errors.New("export failed")
ErrRollbackWithActiveBranch = fmt.Errorf("%w: rollback with active branch", ErrFeatureNotSupported)
)
1 change: 1 addition & 0 deletions catalog/mvcc/cataloger_merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,7 @@ func applyDiffChangesToRightBranch(tx db.Tx, mergeBatch mergeBatchRecords, previ
}
return nil
}

func insertMergeCommit(tx db.Tx, relation RelationType, leftID int64, rightID int64, nextCommitID CommitID, previousMaxCommitID CommitID, committer string, msg string, metadata catalog.Metadata) error {
var childNewLineage []int64
leftLastCommitID, err := getLastCommitIDByBranchID(tx, leftID)
Expand Down
71 changes: 63 additions & 8 deletions catalog/mvcc/cataloger_rollback_commit.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,71 @@ package mvcc

import (
"context"
"fmt"

"github.com/treeverse/lakefs/catalog"

"github.com/treeverse/lakefs/logging"
"github.com/treeverse/lakefs/db"
)

func (c *cataloger) RollbackCommit(ctx context.Context, repository, reference string) error {
c.log.WithContext(ctx).WithFields(logging.Fields{
"repository": repository,
"reference": reference,
}).Debug("Rollback commit - feature not supported")
return catalog.ErrFeatureNotSupported
func (c *cataloger) RollbackCommit(ctx context.Context, repository, branch, reference string) error {
if err := Validate(ValidateFields{
{Name: "repository", IsValid: ValidateRepositoryName(repository)},
{Name: "reference", IsValid: ValidateReference(reference)},
}); err != nil {
return err
}

ref, err := ParseRef(reference)
if err != nil {
return err
}
if ref.CommitID <= UncommittedID {
return catalog.ErrInvalidReference
}
if ref.Branch != branch {
return catalog.ErrInvalidReference
}

_, err = c.db.Transact(func(tx db.Tx) (interface{}, error) {
// extract branch id from reference
branchID, err := getBranchID(tx, repository, ref.Branch, LockTypeUpdate)
if err != nil {
return nil, err
}

// validate no child branch point to parent commit
var count int
err = tx.GetPrimitive(&count, `SELECT COUNT(*) from catalog_commits
WHERE merge_source_branch = $1 AND merge_source_commit > $2 AND merge_type = 'from_parent'`,
branchID, ref.CommitID)
if err != nil {
return nil, fmt.Errorf("check merge with branch: %w", err)
}
if count > 0 {
return nil, catalog.ErrRollbackWithActiveBranch
}

// delete all commits after this commit on this branch
_, err = tx.Exec(`DELETE FROM catalog_commits WHERE branch_id = $1 AND commit_id > $2`,
branchID, ref.CommitID)
if err != nil {
return nil, fmt.Errorf("delete commits on branch %d, after commit %d: %w", branchID, ref.CommitID, err)
}

// delete all entries created after this commit
_, err = tx.Exec(`DELETE FROM catalog_entries WHERE branch_id = $1 AND min_commit > $2`,
branchID, ref.CommitID)
if err != nil {
return nil, fmt.Errorf("delete entries %d, after min commit %d: %w", branchID, ref.CommitID, err)
}

// update max_commit to infinite
_, err = tx.Exec(`UPDATE catalog_entries SET max_commit = $1 WHERE branch_id = $2 AND max_commit >= $3 AND NOT max_commit = $1`,
MaxCommitID, branchID, ref.CommitID)
if err != nil {
return nil, fmt.Errorf("clear entries %d, max commit %d: %w", branchID, ref.CommitID, err)
}
return nil, nil
}, c.txOpts(ctx, db.ReadCommitted())...)
return err
}
133 changes: 133 additions & 0 deletions catalog/mvcc/cataloger_rollback_commit_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
package mvcc

import (
"context"
"testing"

"github.com/treeverse/lakefs/catalog"
"github.com/treeverse/lakefs/testutil"
)

func TestCataloger_RollbackCommit_Basic(t *testing.T) {
ctx := context.Background()
c := testCataloger(t)
repository := testCatalogerRepo(t, ctx, c, "repo", "master")

files := []string{"file1", "file2", "file3"}
var refs []string
// commit 3 files
for _, filename := range files {
testCatalogerCreateEntry(t, ctx, c, repository, "master", filename, nil, "")

commitLog, err := c.Commit(ctx, repository, "master", "first", "tester", nil)
testutil.MustDo(t, "first commit", err)

refs = append(refs, commitLog.Reference)
}
if refs == nil || len(refs) != 3 {
t.Fatalf("expected 3 references for 3 commits, got %d", len(refs))
}

// rollback to each reference and check all files are there
for i := 0; i < len(refs); i++ {
filesCount := len(refs) - i
ref := refs[filesCount-1]
err := c.RollbackCommit(ctx, repository, "master", ref)
testutil.MustDo(t, "rollback", err)

entries, _, err := c.ListEntries(ctx, repository, "master", "", "", "", -1)
testutil.MustDo(t, "list entries", err)
if len(entries) != filesCount {
t.Fatalf("List entries length after revert %d, expected %d", len(entries), filesCount)
}
for i := 0; i < filesCount; i++ {
if entries[i].Path != files[i] {
t.Fatalf("List entries after revert, file at index %d: %s, expected %s", i, entries[i].Path, files[i])
}
}
}

// check there are no commits
commits, _, err := c.ListCommits(ctx, repository, "master", "", -1)
testutil.MustDo(t, "list commits", err)
const expectedCommitsLen = 3 // branch + repo + first commit
if len(commits) != expectedCommitsLen {
t.Fatalf("List commits len=%d, expected=%d", len(commits), expectedCommitsLen)
}
}

func TestCataloger_RollbackCommit_BlockedByBranch(t *testing.T) {
ctx := context.Background()
c := testCataloger(t)
repository := testCatalogerRepo(t, ctx, c, "repo", "master")

// get first commit
masterReference, err := c.GetBranchReference(ctx, repository, "master")
testutil.MustDo(t, "getting master branch reference", err)

// create a branch
_, err = c.CreateBranch(ctx, repository, "branch1", "master")
testutil.MustDo(t, "create branch1", err)

// commit new data to master
testCatalogerCreateEntry(t, ctx, c, repository, "master", "fileX", nil, "")
_, err = c.Commit(ctx, repository, "master", "commit file x", "tester", nil)
testutil.MustDo(t, "commit file x", err)

// merge changes into the branch1
_, err = c.Merge(ctx, repository, "master", "branch1", "tester", "sync file x", nil)
testutil.MustDo(t, "merge master to branch1", err)

// rollback to initial commit should fail
err = c.RollbackCommit(ctx, repository, "master", masterReference)
if err == nil {
t.Fatal("Rollback with blocked branch should fail with error")
}
}

func TestCataloger_RollbackCommit_AfterMerge(t *testing.T) {
ctx := context.Background()
c := testCataloger(t)
repository := testCatalogerRepo(t, ctx, c, "repo", "master")

// create and commit a file - fileFile
filenames := []string{"file1", "file2"}
for _, filename := range filenames {
testCatalogerCreateEntry(t, ctx, c, repository, "master", filename, nil, "")
}
firstCommit, err := c.Commit(ctx, repository, "master", "first file", "tester", nil)
testutil.MustDo(t, "first commit", err)

// create a branch and commit some changes - add, delete, modify and commit
_, err = c.CreateBranch(ctx, repository, "branch1", "master")
testutil.MustDo(t, "create branch1", err)
// update file1 on branch1
testCatalogerCreateEntry(t, ctx, c, repository, "branch1", "file1", nil, "branch1")
// delete file2 on branch1
err = c.DeleteEntry(ctx, repository, "branch1", "file2")
testutil.MustDo(t, "delete file2", err)
// add file2 on branch1
testCatalogerCreateEntry(t, ctx, c, repository, "branch1", "file2", nil, "branch1")
// commit changes
_, err = c.Commit(ctx, repository, "branch1", "tester", "changes", nil)
testutil.MustDo(t, "commit changes to branch1", err)

// merge changes from branch1 to master
_, err = c.Merge(ctx, repository, "branch1", "master", "tester", "sync branch1 to master", nil)
testutil.MustDo(t, "merge branch1 to master", err)

// rollback to first commit
err = c.RollbackCommit(ctx, repository, "master", firstCommit.Reference)
testutil.MustDo(t, "rollback to first commit", err)

// check we have our original files
for _, filename := range filenames {
ent, err := c.GetEntry(ctx, repository, "master", filename, catalog.GetEntryParams{})
testutil.MustDo(t, filename+" get should work", err)

expectedChecksum := testCreateEntryCalcChecksum(filename, t.Name(), "")
if expectedChecksum != ent.Checksum {
t.Fatalf("Entry file1 after revert checksum %s, expected %s", ent.Checksum, expectedChecksum)
}
}
}
2 changes: 1 addition & 1 deletion catalog/rocks/cataloger.go
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,7 @@ func (c *cataloger) ListCommits(ctx context.Context, repository string, branch s
panic("not implemented") // TODO: Implement
}

func (c *cataloger) RollbackCommit(ctx context.Context, repository string, reference string) error {
func (c *cataloger) RollbackCommit(ctx context.Context, repository string, branch string, reference string) error {
panic("not implemented") // TODO: Implement
}

Expand Down
28 changes: 28 additions & 0 deletions forest/sstable/sstable.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,3 +44,31 @@ type Writer interface {
// Close flushes all entries to the disk and returns the WriteResult.
Close() (*WriteResult, error)
}

// BatchWriterCloser collects sstable writers and handles the asynchronous
// flushing and closing of the writers.
// Example usage:
// func batch(manager Manager, bwc BatchWriterCloser) {
// w1, _ := manager.GetWriter()
// _ = w1.WriteEntry(rocks.EntryRecord{Path: "foo1", Entry: &rocks.Entry{Address: "bar1"}})
// _ = w1.WriteEntry(rocks.EntryRecord{Path: "foo2", Entry: &rocks.Entry{Address: "bar2"}})
// _ = bwc.CloseWriterAsync(w1)

// w2, _ := manager.GetWriter()
// _ = w2.WriteEntry(rocks.EntryRecord{Path: "goo1", Entry: &rocks.Entry{Address: "baz1"}})
// _ = bwc.CloseWriterAsync(w2)

// // blocks until all writers finished or any writer failed
// res, err := bwc.Wait()
// // handle err, results, etc..
// }
type BatchWriterCloser interface {
// CloseWriterAsync adds Writer instance for the BatchWriterCloser to handle.
// Any writes executed to the writer after this call are not guaranteed to succeed.
// If Wait() has already been called, returns an error.
CloseWriterAsync(Writer) error

// Wait returns when all Writers finished.
// Any failure to close a single Writer will return with a nil results slice and an error.
Wait() ([]WriteResult, error)
}
2 changes: 2 additions & 0 deletions gateway/playback_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,8 @@ func (m *mockCollector) CollectMetadata(accountMetadata *stats.Metadata) {}

func (m *mockCollector) CollectEvent(class, action string) {}

func (m *mockCollector) SetInstallationID(_ string) {}

func getBasicHandlerPlayback(t *testing.T) (http.Handler, *dependencies) {
authService := newGatewayAuthFromFile(t, simulator.PlaybackParams.RecordingDir)
return getBasicHandler(t, authService)
Expand Down
2 changes: 2 additions & 0 deletions loadtest/local_load_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ func (m *mockCollector) CollectMetadata(_ *stats.Metadata) {}

func (m *mockCollector) CollectEvent(_, _ string) {}

func (m *mockCollector) SetInstallationID(_ string) {}

func TestLocalLoad(t *testing.T) {
if testing.Short() {
t.Skip("Skipping loadtest tests in short mode")
Expand Down
5 changes: 5 additions & 0 deletions stats/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ const (
type Collector interface {
CollectEvent(class, action string)
CollectMetadata(accountMetadata *Metadata)
SetInstallationID(installationID string)
}

type Metric struct {
Expand Down Expand Up @@ -218,6 +219,10 @@ func (s *BufferedCollector) collectHeartbeat(ctx context.Context) {
}
}

func (s *BufferedCollector) SetInstallationID(installationID string) {
s.installationID = installationID
}

func getBufferedCollectorArgs(c *config.Config) (processID string, opts []BufferedCollectorOpts) {
if c == nil {
return "", nil
Expand Down

0 comments on commit add7902

Please sign in to comment.