Skip to content

Commit

Permalink
code review changes - move more types into mvcc
Browse files Browse the repository at this point in the history
  • Loading branch information
nopcoder committed Nov 22, 2020
1 parent de0b3e0 commit f3efa47
Show file tree
Hide file tree
Showing 53 changed files with 343 additions and 348 deletions.
21 changes: 0 additions & 21 deletions catalog/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,24 +75,3 @@ func (j *Metadata) Scan(src interface{}) error {
}
return json.Unmarshal(data, j)
}

type MinMaxCommit struct {
MinCommit CommitID `db:"min_commit"`
MaxCommit CommitID `db:"max_commit"`
}

func (m MinMaxCommit) IsDeleted() bool {
return m.MaxCommit != MaxCommitID
}
func (m MinMaxCommit) IsTombstone() bool {
return m.MaxCommit == TombstoneCommitID
}

func (m MinMaxCommit) IsCommitted() bool {
return m.MinCommit != MaxCommitID
}

func (m MinMaxCommit) ChangedAfterCommit(commitID CommitID) bool {
// needed for diff, to check if an entry changed after the lineage commit id
return m.MinCommit > commitID || (m.IsDeleted() && m.MaxCommit >= commitID)
}
20 changes: 10 additions & 10 deletions catalog/mvcc/cataloger_commit.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@ import (
)

func (c *cataloger) Commit(ctx context.Context, repository, branch string, message string, committer string, metadata catalog.Metadata) (*catalog.CommitLog, error) {
if err := catalog.Validate(catalog.ValidateFields{
{Name: "branch", IsValid: catalog.ValidateBranchName(branch)},
{Name: "message", IsValid: catalog.ValidateCommitMessage(message)},
{Name: "committer", IsValid: catalog.ValidateCommitter(committer)},
if err := Validate(ValidateFields{
{Name: "branch", IsValid: ValidateBranchName(branch)},
{Name: "message", IsValid: ValidateCommitMessage(message)},
{Name: "committer", IsValid: ValidateCommitter(committer)},
}); err != nil {
return nil, err
}
Expand Down Expand Up @@ -65,8 +65,8 @@ func (c *cataloger) Commit(ctx context.Context, repository, branch string, messa
return nil, err
}

reference := catalog.MakeReference(branch, commitID)
parentReference := catalog.MakeReference(branch, lastCommitID)
reference := MakeReference(branch, commitID)
parentReference := MakeReference(branch, lastCommitID)
commitLog := &catalog.CommitLog{
Committer: committer,
Message: message,
Expand All @@ -92,19 +92,19 @@ func (c *cataloger) Commit(ctx context.Context, repository, branch string, messa
return res.(*catalog.CommitLog), nil
}

func commitUpdateCommittedEntriesWithMaxCommit(tx db.Tx, branchID int64, commitID catalog.CommitID) (int64, error) {
func commitUpdateCommittedEntriesWithMaxCommit(tx db.Tx, branchID int64, commitID CommitID) (int64, error) {
res, err := tx.Exec(`UPDATE catalog_entries_v SET max_commit = $2
WHERE branch_id = $1 AND is_committed
AND max_commit = $3
AND path in (SELECT path FROM catalog_entries_v WHERE branch_id = $1 AND NOT is_committed)`,
branchID, commitID, catalog.MaxCommitID)
branchID, commitID, MaxCommitID)
if err != nil {
return 0, err
}
return res.RowsAffected(), nil
}

func commitDeleteUncommittedTombstones(tx db.Tx, branchID int64, commitID catalog.CommitID) (int64, error) {
func commitDeleteUncommittedTombstones(tx db.Tx, branchID int64, commitID CommitID) (int64, error) {
res, err := tx.Exec(`DELETE FROM catalog_entries_v WHERE branch_id = $1 AND NOT is_committed AND is_tombstone AND path IN (
SELECT path FROM catalog_entries_v WHERE branch_id = $1 AND is_committed AND max_commit = $2)`,
branchID, commitID)
Expand All @@ -114,7 +114,7 @@ func commitDeleteUncommittedTombstones(tx db.Tx, branchID int64, commitID catalo
return res.RowsAffected(), nil
}

func commitEntries(tx db.Tx, branchID int64, commitID catalog.CommitID) (int64, error) {
func commitEntries(tx db.Tx, branchID int64, commitID CommitID) (int64, error) {
res, err := tx.Exec(`UPDATE catalog_entries_v SET min_commit = $2 WHERE branch_id = $1 AND NOT is_committed`,
branchID, commitID)
if err != nil {
Expand Down
8 changes: 4 additions & 4 deletions catalog/mvcc/cataloger_commit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ func TestCataloger_Commit_Scenario(t *testing.T) {

t.Run("same file more than once", func(t *testing.T) {
repository := testCatalogerRepo(t, ctx, c, "repository", "master")
var previousCommitID catalog.CommitID
var previousCommitID CommitID
for i := 0; i < 3; i++ {
if err := c.CreateEntry(ctx, repository, "master", catalog.Entry{
Path: "/file1",
Expand All @@ -151,7 +151,7 @@ func TestCataloger_Commit_Scenario(t *testing.T) {
}

// parse commit log and check that the commit id goes up
r, err := catalog.ParseRef(commitLog.Reference)
r, err := ParseRef(commitLog.Reference)
testutil.Must(t, err)
if r.CommitID <= previousCommitID {
t.Fatalf("Commit ID should go up - %d, previous was %d", r.CommitID, previousCommitID)
Expand All @@ -169,7 +169,7 @@ func TestCataloger_Commit_Scenario(t *testing.T) {

t.Run("file per commit", func(t *testing.T) {
repository := testCatalogerRepo(t, ctx, c, "repository", "master")
var previousCommitID catalog.CommitID
var previousCommitID CommitID
for i := 0; i < 3; i++ {
fileName := fmt.Sprintf("/file%d", i+1)
addrName := fmt.Sprintf("/addr%d", i+1)
Expand All @@ -189,7 +189,7 @@ func TestCataloger_Commit_Scenario(t *testing.T) {
}

// check that commit id goes up
ref, err := catalog.ParseRef(commitLog.Reference)
ref, err := ParseRef(commitLog.Reference)
testutil.Must(t, err)
if ref.CommitID <= previousCommitID {
t.Fatalf("Commit new commit ID %d, should go up - previous %d", ref.CommitID, previousCommitID)
Expand Down
18 changes: 9 additions & 9 deletions catalog/mvcc/cataloger_create_branch.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@ const (
)

func (c *cataloger) CreateBranch(ctx context.Context, repository, branch string, sourceBranch string) (*catalog.CommitLog, error) {
if err := catalog.Validate(catalog.ValidateFields{
{Name: "repository", IsValid: catalog.ValidateRepositoryName(repository)},
{Name: "branch", IsValid: catalog.ValidateBranchName(branch)},
{Name: "sourceBranch", IsValid: catalog.ValidateBranchName(sourceBranch)},
if err := Validate(ValidateFields{
{Name: "repository", IsValid: ValidateRepositoryName(repository)},
{Name: "branch", IsValid: ValidateBranchName(branch)},
{Name: "sourceBranch", IsValid: ValidateBranchName(sourceBranch)},
}); err != nil {
return nil, err
}
Expand Down Expand Up @@ -54,9 +54,9 @@ func (c *cataloger) CreateBranch(ctx context.Context, repository, branch string,
}

insertReturns := struct {
CommitID catalog.CommitID `db:"commit_id"`
MergeSourceCommit catalog.CommitID `db:"merge_source_commit"`
TransactionTimestamp time.Time `db:"transaction_timestamp"`
CommitID CommitID `db:"commit_id"`
MergeSourceCommit CommitID `db:"merge_source_commit"`
TransactionTimestamp time.Time `db:"transaction_timestamp"`
}{}
commitMsg := fmt.Sprintf(createBranchCommitMessageFormat, branch, sourceBranch)
err = tx.Get(&insertReturns, `INSERT INTO catalog_commits (branch_id,commit_id,previous_commit_id,committer,message,
Expand All @@ -71,8 +71,8 @@ func (c *cataloger) CreateBranch(ctx context.Context, repository, branch string,
if err != nil {
return nil, fmt.Errorf("insert commit: %w", err)
}
reference := catalog.MakeReference(branch, insertReturns.CommitID)
parentReference := catalog.MakeReference(sourceBranch, insertReturns.MergeSourceCommit)
reference := MakeReference(branch, insertReturns.CommitID)
parentReference := MakeReference(sourceBranch, insertReturns.MergeSourceCommit)

commitLog := &catalog.CommitLog{
Committer: catalog.DefaultCommitter,
Expand Down
12 changes: 6 additions & 6 deletions catalog/mvcc/cataloger_create_entries.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ import (
// CreateEntries add multiple entries into the catalog, this process doesn't pass through de-dup mechanism.
// It is mainly used by import mass entries into the catalog.
func (c *cataloger) CreateEntries(ctx context.Context, repository, branch string, entries []catalog.Entry) error {
if err := catalog.Validate(catalog.ValidateFields{
{Name: "repository", IsValid: catalog.ValidateRepositoryName(repository)},
{Name: "branch", IsValid: catalog.ValidateBranchName(branch)},
if err := Validate(ValidateFields{
{Name: "repository", IsValid: ValidateRepositoryName(repository)},
{Name: "branch", IsValid: ValidateBranchName(branch)},
}); err != nil {
return err
}
Expand All @@ -29,7 +29,7 @@ func (c *cataloger) CreateEntries(ctx context.Context, repository, branch string
entriesMap := make(map[string]*catalog.Entry, len(entries))
for i := len(entries) - 1; i >= 0; i-- {
p := entries[i].Path
if !catalog.IsNonEmptyString(p) {
if !IsNonEmptyString(p) {
return fmt.Errorf("entry at pos %d, path: %w", i, catalog.ErrInvalidValue)
}
entriesMap[p] = &entries[i]
Expand Down Expand Up @@ -66,10 +66,10 @@ func (c *cataloger) CreateEntries(ctx context.Context, repository, branch string
dbTime.Valid = true
}
sqInsert = sqInsert.Values(branchID, entry.Path, entry.PhysicalAddress, entry.Checksum, entry.Size, entry.Metadata,
sq.Expr("COALESCE(?,NOW())", dbTime), entry.Expired, catalog.MaxCommitID)
sq.Expr("COALESCE(?,NOW())", dbTime), entry.Expired, MaxCommitID)
}
query, args, err := sqInsert.Suffix(`ON CONFLICT (branch_id,path,min_commit)
DO UPDATE SET physical_address=EXCLUDED.physical_address, checksum=EXCLUDED.checksum, size=EXCLUDED.size, metadata=EXCLUDED.metadata, creation_date=EXCLUDED.creation_date, is_expired=EXCLUDED.is_expired, min_commit=EXCLUDED.min_commit, max_commit=?`, catalog.MaxCommitID).
DO UPDATE SET physical_address=EXCLUDED.physical_address, checksum=EXCLUDED.checksum, size=EXCLUDED.size, metadata=EXCLUDED.metadata, creation_date=EXCLUDED.creation_date, is_expired=EXCLUDED.is_expired, min_commit=EXCLUDED.min_commit, max_commit=?`, MaxCommitID).
ToSql()
if err != nil {
return nil, fmt.Errorf("build query: %w", err)
Expand Down
10 changes: 5 additions & 5 deletions catalog/mvcc/cataloger_create_entry.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@ import (
)

func (c *cataloger) CreateEntry(ctx context.Context, repository, branch string, entry catalog.Entry, params catalog.CreateEntryParams) error {
if err := catalog.Validate(catalog.ValidateFields{
{Name: "repository", IsValid: catalog.ValidateRepositoryName(repository)},
{Name: "branch", IsValid: catalog.ValidateBranchName(branch)},
{Name: "path", IsValid: catalog.ValidatePath(entry.Path)},
if err := Validate(ValidateFields{
{Name: "repository", IsValid: ValidateRepositoryName(repository)},
{Name: "branch", IsValid: ValidateBranchName(branch)},
{Name: "path", IsValid: ValidatePath(entry.Path)},
}); err != nil {
return err
}
Expand Down Expand Up @@ -58,7 +58,7 @@ func insertEntry(tx db.Tx, branchID int64, entry *catalog.Entry) (string, error)
ON CONFLICT (branch_id,path,min_commit)
DO UPDATE SET physical_address=EXCLUDED.physical_address, checksum=EXCLUDED.checksum, size=EXCLUDED.size, metadata=EXCLUDED.metadata, creation_date=EXCLUDED.creation_date, is_expired=EXCLUDED.is_expired, min_commit=EXCLUDED.min_commit, max_commit=$9
RETURNING ctid`,
branchID, entry.Path, entry.PhysicalAddress, entry.Checksum, entry.Size, entry.Metadata, dbTime, entry.Expired, catalog.MaxCommitID)
branchID, entry.Path, entry.PhysicalAddress, entry.Checksum, entry.Size, entry.Metadata, dbTime, entry.Expired, MaxCommitID)
if err != nil {
return "", fmt.Errorf("insert entry: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion catalog/mvcc/cataloger_create_entry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ func TestCataloger_CreateEntry(t *testing.T) {
return
}
// in case there is no error - get the entry and compare
ref := catalog.MakeReference(tt.args.branch, catalog.UncommittedID)
ref := MakeReference(tt.args.branch, UncommittedID)
ent, err := c.GetEntry(ctx, tt.args.repository, ref, tt.args.path, catalog.GetEntryParams{})
testutil.MustDo(t, "get entry we just created", err)
if ent.Size != tt.args.size {
Expand Down
11 changes: 5 additions & 6 deletions catalog/mvcc/cataloger_create_multipart_upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,15 @@ import (
"context"
"time"

"github.com/treeverse/lakefs/catalog"
"github.com/treeverse/lakefs/db"
)

func (c *cataloger) CreateMultipartUpload(ctx context.Context, repository string, uploadID, path, physicalAddress string, creationTime time.Time) error {
if err := catalog.Validate(catalog.ValidateFields{
{Name: "repository", IsValid: catalog.ValidateRepositoryName(repository)},
{Name: "uploadID", IsValid: catalog.ValidateUploadID(uploadID)},
{Name: "path", IsValid: catalog.ValidatePath(path)},
{Name: "physicalAddress", IsValid: catalog.ValidatePhysicalAddress(physicalAddress)},
if err := Validate(ValidateFields{
{Name: "repository", IsValid: ValidateRepositoryName(repository)},
{Name: "uploadID", IsValid: ValidateUploadID(uploadID)},
{Name: "path", IsValid: ValidatePath(path)},
{Name: "physicalAddress", IsValid: ValidatePhysicalAddress(physicalAddress)},
}); err != nil {
return err
}
Expand Down
10 changes: 5 additions & 5 deletions catalog/mvcc/cataloger_create_repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@ const (
)

func (c *cataloger) CreateRepository(ctx context.Context, repository string, storageNamespace string, branch string) (*catalog.Repository, error) {
if err := catalog.Validate(catalog.ValidateFields{
{Name: "repository", IsValid: catalog.ValidateRepositoryName(repository)},
{Name: "storageNamespace", IsValid: catalog.ValidateStorageNamespace(storageNamespace)},
{Name: "branch", IsValid: catalog.ValidateBranchName(branch)},
if err := Validate(ValidateFields{
{Name: "repository", IsValid: ValidateRepositoryName(repository)},
{Name: "storageNamespace", IsValid: ValidateStorageNamespace(storageNamespace)},
{Name: "branch", IsValid: ValidateBranchName(branch)},
}); err != nil {
return nil, err
}
Expand Down Expand Up @@ -60,7 +60,7 @@ func (c *cataloger) CreateRepository(ctx context.Context, repository string, sto
}

// create initial commit for import branch
var importCommitID catalog.CommitID
var importCommitID CommitID
err := tx.GetPrimitive(&importCommitID, `INSERT INTO catalog_commits (branch_id,commit_id,committer,message,creation_date,previous_commit_id)
VALUES ($1,nextval('catalog_commit_id_seq'),$2,$3,transaction_timestamp(),0)
RETURNING commit_id`,
Expand Down
6 changes: 3 additions & 3 deletions catalog/mvcc/cataloger_delete_branch.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ import (
)

func (c *cataloger) DeleteBranch(ctx context.Context, repository, branch string) error {
if err := catalog.Validate(catalog.ValidateFields{
{Name: "repository", IsValid: catalog.ValidateRepositoryName(repository)},
{Name: "branch", IsValid: catalog.ValidateBranchName(branch)},
if err := Validate(ValidateFields{
{Name: "repository", IsValid: ValidateRepositoryName(repository)},
{Name: "branch", IsValid: ValidateBranchName(branch)},
}); err != nil {
return err
}
Expand Down
14 changes: 7 additions & 7 deletions catalog/mvcc/cataloger_delete_entry.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ import (
)

func (c *cataloger) DeleteEntry(ctx context.Context, repository, branch string, path string) error {
if err := catalog.Validate(catalog.ValidateFields{
{Name: "repository", IsValid: catalog.ValidateRepositoryName(repository)},
{Name: "branch", IsValid: catalog.ValidateBranchName(branch)},
if err := Validate(ValidateFields{
{Name: "repository", IsValid: ValidateRepositoryName(repository)},
{Name: "branch", IsValid: ValidateBranchName(branch)},
}); err != nil {
return err
}
Expand All @@ -28,20 +28,20 @@ func (c *cataloger) DeleteEntry(ctx context.Context, repository, branch string,

// delete uncommitted entry, if found first
res, err := tx.Exec("DELETE FROM catalog_entries WHERE branch_id=$1 AND path=$2 AND min_commit=$3 AND max_commit=$4",
branchID, path, catalog.MinCommitUncommittedIndicator, catalog.MaxCommitID)
branchID, path, MinCommitUncommittedIndicator, MaxCommitID)
if err != nil {
return nil, fmt.Errorf("uncommitted: %w", err)
}
deletedUncommittedCount := res.RowsAffected()

// get uncommitted entry based on path
lineage, err := getLineage(tx, branchID, catalog.UncommittedID)
lineage, err := getLineage(tx, branchID, UncommittedID)
if err != nil {
return nil, fmt.Errorf("get lineage: %w", err)
}
sql, args, err := psql.
Select("is_committed").
FromSelect(sqEntriesLineage(branchID, catalog.UncommittedID, lineage), "entries").
FromSelect(sqEntriesLineage(branchID, UncommittedID, lineage), "entries").
// Expired objects *can* be successfully deleted!
Where(sq.Eq{"path": path, "is_deleted": false}).
ToSql()
Expand All @@ -61,7 +61,7 @@ func (c *cataloger) DeleteEntry(ctx context.Context, repository, branch string,
if isCommitted {
_, err = tx.Exec(`INSERT INTO catalog_entries (branch_id,path,physical_address,checksum,size,metadata,min_commit,max_commit)
VALUES ($1,$2,'','',0,'{}',$3,0)`,
branchID, path, catalog.MaxCommitID)
branchID, path, MaxCommitID)
if err != nil {
return nil, fmt.Errorf("tombstone: %w", err)
}
Expand Down
4 changes: 2 additions & 2 deletions catalog/mvcc/cataloger_delete_entry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func TestCataloger_DeleteEntry(t *testing.T) {
}

func testDeleteEntryExpectNotFound(t *testing.T, ctx context.Context, c catalog.Cataloger, repository, branch string, path string) {
_, err := c.GetEntry(ctx, repository, catalog.MakeReference(branch, catalog.UncommittedID), path, catalog.GetEntryParams{})
_, err := c.GetEntry(ctx, repository, MakeReference(branch, UncommittedID), path, catalog.GetEntryParams{})
wantErr := db.ErrNotFound
if !errors.As(err, &wantErr) {
t.Fatalf("DeleteEntry() get entry err = %s, want = %s", err, wantErr)
Expand All @@ -114,7 +114,7 @@ func testDeleteEntryCommitAndExpectNotFound(t *testing.T, ctx context.Context, c
if err != nil {
t.Fatal("Failed to commit before expect not found:", err)
}
_, err = c.GetEntry(ctx, repository, branch+catalog.CommittedSuffix, path, catalog.GetEntryParams{})
_, err = c.GetEntry(ctx, repository, branch+CommittedSuffix, path, catalog.GetEntryParams{})
wantErr := db.ErrNotFound
if !errors.As(err, &wantErr) {
t.Fatalf("DeleteEntry() get entry err = %s, want = %s", err, wantErr)
Expand Down
6 changes: 3 additions & 3 deletions catalog/mvcc/cataloger_delete_multipart_upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ import (
)

func (c *cataloger) DeleteMultipartUpload(ctx context.Context, repository string, uploadID string) error {
if err := catalog.Validate(catalog.ValidateFields{
{Name: "repository", IsValid: catalog.ValidateRepositoryName(repository)},
{Name: "uploadID", IsValid: catalog.ValidateUploadID(uploadID)},
if err := Validate(ValidateFields{
{Name: "repository", IsValid: ValidateRepositoryName(repository)},
{Name: "uploadID", IsValid: ValidateUploadID(uploadID)},
}); err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions catalog/mvcc/cataloger_delete_repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ import (
)

func (c *cataloger) DeleteRepository(ctx context.Context, repository string) error {
if err := catalog.Validate(catalog.ValidateFields{
{Name: "repository", IsValid: catalog.ValidateRepositoryName(repository)},
if err := Validate(ValidateFields{
{Name: "repository", IsValid: ValidateRepositoryName(repository)},
}); err != nil {
return err
}
Expand Down
12 changes: 6 additions & 6 deletions catalog/mvcc/cataloger_diff.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,20 @@ import (
const DiffMaxLimit = 1000

func (c *cataloger) Diff(ctx context.Context, repository string, leftReference string, rightReference string, params catalog.DiffParams) (catalog.Differences, bool, error) {
if err := catalog.Validate(catalog.ValidateFields{
{Name: "repository", IsValid: catalog.ValidateRepositoryName(repository)},
{Name: "leftReference", IsValid: catalog.ValidateReference(leftReference)},
{Name: "rightReference", IsValid: catalog.ValidateReference(rightReference)},
if err := Validate(ValidateFields{
{Name: "repository", IsValid: ValidateRepositoryName(repository)},
{Name: "leftReference", IsValid: ValidateReference(leftReference)},
{Name: "rightReference", IsValid: ValidateReference(rightReference)},
}); err != nil {
return nil, false, err
}

// parse references
leftRef, err := catalog.ParseRef(leftReference)
leftRef, err := ParseRef(leftReference)
if err != nil {
return nil, false, fmt.Errorf("left reference: %w", err)
}
rightRef, err := catalog.ParseRef(rightReference)
rightRef, err := ParseRef(rightReference)
if err != nil {
return nil, false, fmt.Errorf("right reference: %w", err)
}
Expand Down
Loading

0 comments on commit f3efa47

Please sign in to comment.