From 99e89ece78db6c622c3015fd2f84e24f2e3116a9 Mon Sep 17 00:00:00 2001 From: tzahij Date: Sun, 6 Dec 2020 16:07:47 +0200 Subject: [PATCH] Committing an object deletion to master erases object from all commits to master - #997 (#1000) * SSTable interface for committed data * Remove unnecessary funcs from iterator * added test for merge with zero changes * initial commit of tree * Change iterator to match the existing scanners interfaces * initial commit of tree * initial commit of tree * initial commit for rocks catalog interface * sstable initial implementation without repo seperation * initial catalog interface * initial catalog interface * initial catalog interface * Update catalog/rocks/catalog.go Co-authored-by: itaiad200 * iterator interface * diff type comment * continue talking about StagingManager * initial commit of tree * just save changes - initial * working on apply * working on apply * working on apply * sstable interface * changes * fix to entry disappearing from a commit listing when it was deleted in a later commit * Update catalog/mvcc/cataloger_delete_entry_test.go Co-authored-by: arielshaqed * compare with get entry with catalog entry not found * changes after review Co-authored-by: Itai Admi Co-authored-by: guyhardonag Co-authored-by: guy-har <60321938+guy-har@users.noreply.github.com> Co-authored-by: Barak Amar Co-authored-by: arielshaqed --- catalog/mvcc/cataloger_delete_entry_test.go | 93 +++++++++++++++++++++ catalog/mvcc/cataloger_list_entries.go | 16 ++-- catalog/mvcc/views.go | 17 ++-- 3 files changed, 109 insertions(+), 17 deletions(-) diff --git a/catalog/mvcc/cataloger_delete_entry_test.go b/catalog/mvcc/cataloger_delete_entry_test.go index 8e0c47a5806..757485d71be 100644 --- a/catalog/mvcc/cataloger_delete_entry_test.go +++ b/catalog/mvcc/cataloger_delete_entry_test.go @@ -5,6 +5,8 @@ import ( "errors" "testing" + "github.com/treeverse/lakefs/testutil" + "github.com/treeverse/lakefs/catalog" "github.com/treeverse/lakefs/db" ) @@ -120,3 +122,94 @@ func testDeleteEntryCommitAndExpectNotFound(t *testing.T, ctx context.Context, c t.Fatalf("DeleteEntry() get entry err = %s, want = %s", err, wantErr) } } + +func TestCataloger_DeleteEntryAndCheckItRemainsInCommits(t *testing.T) { + ctx := context.Background() + c := testCataloger(t) + repository := testCatalogerRepo(t, ctx, c, "repository", "master") + if err := c.CreateEntry(ctx, repository, "master", catalog.Entry{ + Path: "/file2", + Checksum: "ff", + PhysicalAddress: "/addr2", + Size: 2, + Metadata: nil, + }, catalog.CreateEntryParams{}); err != nil { + t.Fatal("create entry for delete entry test:", err) + } + prevCommit, err := c.Commit(ctx, repository, "master", "commit before deletion test failed ", "tester", nil) + if err != nil { + t.Fatal("Failed to commit before expect not found:", err) + } + entry, err := c.GetEntry(ctx, repository, prevCommit.Reference, "/file2", catalog.GetEntryParams{}) + _ = entry + err = c.DeleteEntry(ctx, repository, "master", "/file2") + if err != nil { + t.Fatal("delete failed: ", err) + } + nextCommit, err := c.Commit(ctx, repository, "master", "commit after deletion ", "tester", nil) + if err != nil { + t.Fatal("Failed to commit after delete:", err) + } + entry, err = c.GetEntry(ctx, repository, nextCommit.Reference, "/file2", catalog.GetEntryParams{}) + entry, err = c.GetEntry(ctx, repository, prevCommit.Reference, "/file2", catalog.GetEntryParams{}) + list, _, err := c.ListEntries(ctx, repository, nextCommit.Reference, "", "", "", 1000) + if len(list) != 0 { + t.Fatal("list entries returned deleted object") + } + list, _, err = c.ListEntries(ctx, repository, prevCommit.Reference, "", "", "", 1000) + if len(list) != 1 { + t.Fatal("list entries by commitID did not return deleted object from next commit") + } + list, _, err = c.ListEntries(ctx, repository, nextCommit.Reference, "", "", "/", 1000) + if len(list) != 0 { + t.Fatal("list entries by prefix returned deleted object") + } + list, _, err = c.ListEntries(ctx, repository, prevCommit.Reference, "", "", "/", 1000) + if len(list) != 1 { + t.Fatal("list entries by prefix on commitID did not return deleted object from next commit") + } +} + +func TestCataloger_DeleteEntryVerifyExisting(t *testing.T) { + ctx := context.Background() + c := testCataloger(t) + repository := testCatalogerRepo(t, ctx, c, "repository", "master") + + testCatalogerCreateEntry(t, ctx, c, repository, "master", "file1", nil, "") + commit1, err := c.Commit(ctx, repository, "master", "add file1", "committer", nil) + testutil.MustDo(t, "commit add file1", err) + + _, err = c.CreateBranch(ctx, repository, "branch1", "master") + testutil.MustDo(t, "create branch1", err) + + err = c.DeleteEntry(ctx, repository, "master", "file1") + testutil.MustDo(t, "delete file1", err) + + _, err = c.Commit(ctx, repository, "master", "delete file1", "committer", nil) + testutil.MustDo(t, "commit delete file1", err) + + // check file exists using reference, branch and listing + _, err = c.GetEntry(ctx, repository, commit1.Reference, "file1", catalog.GetEntryParams{}) + testutil.MustDo(t, "get file1 by ref", err) + + _, err = c.GetEntry(ctx, repository, "branch1", "file1", catalog.GetEntryParams{}) + testutil.MustDo(t, "get file1 by branch", err) + + entriesRef, _, err := c.ListEntries(ctx, repository, commit1.Reference, "", "", "", -1) + testutil.MustDo(t, "list using ref", err) + if len(entriesRef) != 1 { + t.Error("ListEntries of ref before delete should include a file") + } + + entriesBranch, _, err := c.ListEntries(ctx, repository, "branch1", "", "", "", -1) + testutil.MustDo(t, "list using branch1", err) + if len(entriesBranch) != 1 { + t.Error("ListEntries of branch before delete should include a file") + } + + // check the file is deleted on master + _, err = c.GetEntry(ctx, repository, "master", "file1", catalog.GetEntryParams{}) + if !errors.Is(err, catalog.ErrEntryNotFound) { + t.Error("GetEntry should return not found on master branch:", err) + } +} diff --git a/catalog/mvcc/cataloger_list_entries.go b/catalog/mvcc/cataloger_list_entries.go index 1a1b3d4ad2d..a95d2425ccb 100644 --- a/catalog/mvcc/cataloger_list_entries.go +++ b/catalog/mvcc/cataloger_list_entries.go @@ -286,30 +286,24 @@ func findLowestResultInBranches(branchRanges map[int64][]entryPathPrefixInfo, br func buildBaseLevelQuery(baseBranchID int64, lineage []lineageCommit, branchEntryLimit int, topCommitID CommitID, prefixLen int, endOfPrefixRange string) map[int64]sq.SelectBuilder { unionMap := make(map[int64]sq.SelectBuilder) - unionMap[baseBranchID] = selectSingleBranch(baseBranchID, true, branchEntryLimit, topCommitID, prefixLen, endOfPrefixRange) + unionMap[baseBranchID] = buildSingleBranchQuery(baseBranchID, branchEntryLimit, topCommitID, prefixLen, endOfPrefixRange) for _, l := range lineage { - unionMap[l.BranchID] = selectSingleBranch(l.BranchID, false, branchEntryLimit, l.CommitID, prefixLen, endOfPrefixRange) + unionMap[l.BranchID] = buildSingleBranchQuery(l.BranchID, branchEntryLimit, l.CommitID, prefixLen, endOfPrefixRange) } return unionMap } -func selectSingleBranch(branchID int64, isBaseBranch bool, branchBatchSize int, topCommitID CommitID, prefixLen int, endOfPrefixRange string) sq.SelectBuilder { - rawSelect := sq.Select("branch_id", "min_commit"). +func buildSingleBranchQuery(branchID int64, branchBatchSize int, topCommitID CommitID, prefixLen int, endOfPrefixRange string) sq.SelectBuilder { + query := sq.Select("branch_id", "min_commit"). Distinct().Options(" ON (branch_id,path)"). Column("substr(path,?) as path_suffix", prefixLen+1). + Column("CASE WHEN max_commit >= ? THEN ? ELSE max_commit END AS max_commit", topCommitID, MaxCommitID). From("catalog_entries"). Where("branch_id = ?", branchID). Where("min_commit <= ?", topCommitID). Where("path < ?", endOfPrefixRange). OrderBy("branch_id", "path", "min_commit desc"). Limit(uint64(branchBatchSize)) - var query sq.SelectBuilder - if isBaseBranch { - query = rawSelect.Column("max_commit") - } else { - query = rawSelect. - Column("CASE WHEN max_commit >= ? THEN ? ELSE max_commit END AS max_commit", topCommitID, MaxCommitID) - } return query } diff --git a/catalog/mvcc/views.go b/catalog/mvcc/views.go index 5ca1ce5cf02..58294df29c9 100644 --- a/catalog/mvcc/views.go +++ b/catalog/mvcc/views.go @@ -1,7 +1,6 @@ package mvcc import ( - "fmt" "strconv" "unicode/utf8" @@ -14,11 +13,17 @@ const ( ) func sqEntriesV(requestedCommit CommitID) sq.SelectBuilder { - entriesQ := sq.Select("*", - fmt.Sprintf("min_commit != %d AS is_committed", MinCommitUncommittedIndicator), - "max_commit = 0 AS is_tombstone", - "ctid AS entry_ctid\n", - fmt.Sprintf("max_commit < %d AS is_deleted", MaxCommitID)). + var actualCommit CommitID + if requestedCommit == UncommittedID || requestedCommit == CommittedID { + actualCommit = MaxCommitID + } else { + actualCommit = requestedCommit + } + entriesQ := sq.Select("*"). + Column("min_commit != ? AS is_committed", MinCommitUncommittedIndicator). + Column("max_commit = 0 AS is_tombstone"). + Column("ctid AS entry_ctid\n"). + Column("max_commit < ? AS is_deleted", actualCommit). From("catalog_entries") switch requestedCommit { case UncommittedID: // no further filtering is required