Skip to content

Commit

Permalink
Committing an object deletion to master erases object from all commit…
Browse files Browse the repository at this point in the history
…s to master - #997 (#1000)

* SSTable interface for committed data

* Remove unnecessary funcs from iterator

* added test for merge with zero changes

* initial commit of tree

* Change iterator to match the existing scanners interfaces

* initial commit of tree

* initial commit of tree

* initial commit for rocks catalog interface

* sstable initial implementation without repo seperation

* initial catalog interface

* initial catalog interface

* initial catalog interface

* Update catalog/rocks/catalog.go

Co-authored-by: itaiad200 <itaiad200@gmail.com>

* iterator interface

* diff type comment

* continue talking about StagingManager

* initial commit of tree

* just save changes - initial

* working on apply

* working on apply

* working on apply

* sstable interface

* changes

* fix to entry disappearing from a commit listing when it was deleted in a later commit

* Update catalog/mvcc/cataloger_delete_entry_test.go

Co-authored-by: arielshaqed <ariels@treeverse.io>

* compare with get entry with catalog entry not found

* changes after review

Co-authored-by: Itai Admi <itaiad200@gmail.com>
Co-authored-by: guyhardonag <guy.hardonag@treeverse.io>
Co-authored-by: guy-har <60321938+guy-har@users.noreply.github.com>
Co-authored-by: Barak Amar <barak.amar@treeverse.io>
Co-authored-by: arielshaqed <ariels@treeverse.io>
  • Loading branch information
6 people authored Dec 6, 2020
1 parent cf041f8 commit 99e89ec
Show file tree
Hide file tree
Showing 3 changed files with 109 additions and 17 deletions.
93 changes: 93 additions & 0 deletions catalog/mvcc/cataloger_delete_entry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"errors"
"testing"

"github.com/treeverse/lakefs/testutil"

"github.com/treeverse/lakefs/catalog"
"github.com/treeverse/lakefs/db"
)
Expand Down Expand Up @@ -120,3 +122,94 @@ func testDeleteEntryCommitAndExpectNotFound(t *testing.T, ctx context.Context, c
t.Fatalf("DeleteEntry() get entry err = %s, want = %s", err, wantErr)
}
}

func TestCataloger_DeleteEntryAndCheckItRemainsInCommits(t *testing.T) {
ctx := context.Background()
c := testCataloger(t)
repository := testCatalogerRepo(t, ctx, c, "repository", "master")
if err := c.CreateEntry(ctx, repository, "master", catalog.Entry{
Path: "/file2",
Checksum: "ff",
PhysicalAddress: "/addr2",
Size: 2,
Metadata: nil,
}, catalog.CreateEntryParams{}); err != nil {
t.Fatal("create entry for delete entry test:", err)
}
prevCommit, err := c.Commit(ctx, repository, "master", "commit before deletion test failed ", "tester", nil)
if err != nil {
t.Fatal("Failed to commit before expect not found:", err)
}
entry, err := c.GetEntry(ctx, repository, prevCommit.Reference, "/file2", catalog.GetEntryParams{})
_ = entry
err = c.DeleteEntry(ctx, repository, "master", "/file2")
if err != nil {
t.Fatal("delete failed: ", err)
}
nextCommit, err := c.Commit(ctx, repository, "master", "commit after deletion ", "tester", nil)
if err != nil {
t.Fatal("Failed to commit after delete:", err)
}
entry, err = c.GetEntry(ctx, repository, nextCommit.Reference, "/file2", catalog.GetEntryParams{})
entry, err = c.GetEntry(ctx, repository, prevCommit.Reference, "/file2", catalog.GetEntryParams{})
list, _, err := c.ListEntries(ctx, repository, nextCommit.Reference, "", "", "", 1000)
if len(list) != 0 {
t.Fatal("list entries returned deleted object")
}
list, _, err = c.ListEntries(ctx, repository, prevCommit.Reference, "", "", "", 1000)
if len(list) != 1 {
t.Fatal("list entries by commitID did not return deleted object from next commit")
}
list, _, err = c.ListEntries(ctx, repository, nextCommit.Reference, "", "", "/", 1000)
if len(list) != 0 {
t.Fatal("list entries by prefix returned deleted object")
}
list, _, err = c.ListEntries(ctx, repository, prevCommit.Reference, "", "", "/", 1000)
if len(list) != 1 {
t.Fatal("list entries by prefix on commitID did not return deleted object from next commit")
}
}

func TestCataloger_DeleteEntryVerifyExisting(t *testing.T) {
ctx := context.Background()
c := testCataloger(t)
repository := testCatalogerRepo(t, ctx, c, "repository", "master")

testCatalogerCreateEntry(t, ctx, c, repository, "master", "file1", nil, "")
commit1, err := c.Commit(ctx, repository, "master", "add file1", "committer", nil)
testutil.MustDo(t, "commit add file1", err)

_, err = c.CreateBranch(ctx, repository, "branch1", "master")
testutil.MustDo(t, "create branch1", err)

err = c.DeleteEntry(ctx, repository, "master", "file1")
testutil.MustDo(t, "delete file1", err)

_, err = c.Commit(ctx, repository, "master", "delete file1", "committer", nil)
testutil.MustDo(t, "commit delete file1", err)

// check file exists using reference, branch and listing
_, err = c.GetEntry(ctx, repository, commit1.Reference, "file1", catalog.GetEntryParams{})
testutil.MustDo(t, "get file1 by ref", err)

_, err = c.GetEntry(ctx, repository, "branch1", "file1", catalog.GetEntryParams{})
testutil.MustDo(t, "get file1 by branch", err)

entriesRef, _, err := c.ListEntries(ctx, repository, commit1.Reference, "", "", "", -1)
testutil.MustDo(t, "list using ref", err)
if len(entriesRef) != 1 {
t.Error("ListEntries of ref before delete should include a file")
}

entriesBranch, _, err := c.ListEntries(ctx, repository, "branch1", "", "", "", -1)
testutil.MustDo(t, "list using branch1", err)
if len(entriesBranch) != 1 {
t.Error("ListEntries of branch before delete should include a file")
}

// check the file is deleted on master
_, err = c.GetEntry(ctx, repository, "master", "file1", catalog.GetEntryParams{})
if !errors.Is(err, catalog.ErrEntryNotFound) {
t.Error("GetEntry should return not found on master branch:", err)
}
}
16 changes: 5 additions & 11 deletions catalog/mvcc/cataloger_list_entries.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,30 +286,24 @@ func findLowestResultInBranches(branchRanges map[int64][]entryPathPrefixInfo, br
func buildBaseLevelQuery(baseBranchID int64, lineage []lineageCommit, branchEntryLimit int,
topCommitID CommitID, prefixLen int, endOfPrefixRange string) map[int64]sq.SelectBuilder {
unionMap := make(map[int64]sq.SelectBuilder)
unionMap[baseBranchID] = selectSingleBranch(baseBranchID, true, branchEntryLimit, topCommitID, prefixLen, endOfPrefixRange)
unionMap[baseBranchID] = buildSingleBranchQuery(baseBranchID, branchEntryLimit, topCommitID, prefixLen, endOfPrefixRange)
for _, l := range lineage {
unionMap[l.BranchID] = selectSingleBranch(l.BranchID, false, branchEntryLimit, l.CommitID, prefixLen, endOfPrefixRange)
unionMap[l.BranchID] = buildSingleBranchQuery(l.BranchID, branchEntryLimit, l.CommitID, prefixLen, endOfPrefixRange)
}
return unionMap
}

func selectSingleBranch(branchID int64, isBaseBranch bool, branchBatchSize int, topCommitID CommitID, prefixLen int, endOfPrefixRange string) sq.SelectBuilder {
rawSelect := sq.Select("branch_id", "min_commit").
func buildSingleBranchQuery(branchID int64, branchBatchSize int, topCommitID CommitID, prefixLen int, endOfPrefixRange string) sq.SelectBuilder {
query := sq.Select("branch_id", "min_commit").
Distinct().Options(" ON (branch_id,path)").
Column("substr(path,?) as path_suffix", prefixLen+1).
Column("CASE WHEN max_commit >= ? THEN ? ELSE max_commit END AS max_commit", topCommitID, MaxCommitID).
From("catalog_entries").
Where("branch_id = ?", branchID).
Where("min_commit <= ?", topCommitID).
Where("path < ?", endOfPrefixRange).
OrderBy("branch_id", "path", "min_commit desc").
Limit(uint64(branchBatchSize))
var query sq.SelectBuilder
if isBaseBranch {
query = rawSelect.Column("max_commit")
} else {
query = rawSelect.
Column("CASE WHEN max_commit >= ? THEN ? ELSE max_commit END AS max_commit", topCommitID, MaxCommitID)
}
return query
}

Expand Down
17 changes: 11 additions & 6 deletions catalog/mvcc/views.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package mvcc

import (
"fmt"
"strconv"
"unicode/utf8"

Expand All @@ -14,11 +13,17 @@ const (
)

func sqEntriesV(requestedCommit CommitID) sq.SelectBuilder {
entriesQ := sq.Select("*",
fmt.Sprintf("min_commit != %d AS is_committed", MinCommitUncommittedIndicator),
"max_commit = 0 AS is_tombstone",
"ctid AS entry_ctid\n",
fmt.Sprintf("max_commit < %d AS is_deleted", MaxCommitID)).
var actualCommit CommitID
if requestedCommit == UncommittedID || requestedCommit == CommittedID {
actualCommit = MaxCommitID
} else {
actualCommit = requestedCommit
}
entriesQ := sq.Select("*").
Column("min_commit != ? AS is_committed", MinCommitUncommittedIndicator).
Column("max_commit = 0 AS is_tombstone").
Column("ctid AS entry_ctid\n").
Column("max_commit < ? AS is_deleted", actualCommit).
From("catalog_entries")
switch requestedCommit {
case UncommittedID: // no further filtering is required
Expand Down

0 comments on commit 99e89ec

Please sign in to comment.