diff --git a/catalog/cataloger_diff.go b/catalog/cataloger_diff.go index 0031cff4062..5b7f9b98518 100644 --- a/catalog/cataloger_diff.go +++ b/catalog/cataloger_diff.go @@ -12,21 +12,51 @@ import ( "github.com/treeverse/lakefs/logging" ) -type contextKey string - -func (k contextKey) String() string { - return string(k) -} - const ( DiffMaxLimit = 1000 - diffResultsTableNamePrefix = "catalog_diff_results" - contextDiffResultsKey contextKey = "diff_results_key" + diffResultsTableNamePrefix = "catalog_diff_results" + diffResultsInsertBatchSize = 1024 + + contextDiffResultsKey contextKey = "diff_results_key" ) +type diffEffectiveCommits struct { + // ParentEffectiveCommit last commit parent merged from child. + // When no sync commit is found - set the commit ID to the point child's branch was created. + ParentEffectiveCommit CommitID `db:"parent_effective_commit"` + + // ChildEffectiveCommit last commit child merged from parent. + // If the child never synced with parent, the commit ID is set to 1. + ChildEffectiveCommit CommitID `db:"child_effective_commit"` + + // ParentEffectiveLineage lineage at the ParentEffectiveCommit + ParentEffectiveLineage []lineageCommit +} + +type contextKey string + +type diffResultRecord struct { + SourceBranch int64 + DiffType DifferenceType + Entry Entry // Partially filled. Path is always set. + EntryCtid *string // CTID of the modified/added entry. Do not use outside of catalog diff-by-iterators. https://github.com/treeverse/lakeFS/issues/831 +} + +type diffResultsBatchWriter struct { + tx db.Tx + DiffResultsTableName string + Records []*diffResultRecord +} + var ErrMissingDiffResultsIDInContext = errors.New("missing diff results id in context") +// Diff lists of differences between leftBranch and rightBranch. +// The second return value will be true if there are more results. Use the last entry's path as the next call to Diff in the 'after' argument. +// limit - is the maximum number of differences we will return, limited by DiffMaxLimit (which will be used in case limit less than 0) +// after - lookup entries whose path comes after this value. +// Diff internal API produce temporary table that this call deletes at the end of a successful transaction (failed will rollback changes) +// The diff results table holds ctid to reference the relevant entry for changed/added and source branch for deleted - information used later to apply changes found func (c *cataloger) Diff(ctx context.Context, repository string, leftBranch string, rightBranch string, limit int, after string) (Differences, bool, error) { if err := Validate(ValidateFields{ {Name: "repository", IsValid: ValidateRepositoryName(repository)}, @@ -39,6 +69,8 @@ func (c *cataloger) Diff(ctx context.Context, repository string, leftBranch stri if limit < 0 || limit > DiffMaxLimit { limit = DiffMaxLimit } + // we request additional one (without returning it) for pagination (hasMore) + diffResultsLimit := limit + 1 ctx, cancel := c.withDiffResultsContext(ctx) defer cancel() @@ -52,11 +84,11 @@ func (c *cataloger) Diff(ctx context.Context, repository string, leftBranch stri if err != nil { return nil, fmt.Errorf("right branch: %w", err) } - err = c.doDiff(ctx, tx, leftID, rightID) + err = c.doDiff(ctx, tx, leftID, rightID, diffResultsLimit, after) if err != nil { return nil, err } - return getDiffDifferences(ctx, tx, limit+1, after) + return getDiffDifferences(ctx, tx, diffResultsLimit, after) }, c.txOpts(ctx)...) if err != nil { return nil, false, err @@ -66,22 +98,24 @@ func (c *cataloger) Diff(ctx context.Context, repository string, leftBranch stri return differences, hasMore, nil } -func (c *cataloger) doDiff(ctx context.Context, tx db.Tx, leftID, rightID int64) error { +// doDiff internal implementation of the actual diff. limit <0 will scan the complete branch +func (c *cataloger) doDiff(ctx context.Context, tx db.Tx, leftID, rightID int64, limit int, after string) error { relation, err := getBranchesRelationType(tx, leftID, rightID) if err != nil { return err } - return c.doDiffByRelation(ctx, tx, relation, leftID, rightID) + return c.doDiffByRelation(ctx, tx, relation, leftID, rightID, limit, after) } -func (c *cataloger) doDiffByRelation(ctx context.Context, tx db.Tx, relation RelationType, leftID, rightID int64) error { +// doDiffByRelation underlying diff between two branches, called by diff and merge +func (c *cataloger) doDiffByRelation(ctx context.Context, tx db.Tx, relation RelationType, leftID, rightID int64, limit int, after string) error { switch relation { case RelationTypeFromParent: - return c.diffFromParent(ctx, tx, leftID, rightID) + return c.diffFromParent(ctx, tx, leftID, rightID, limit, after) case RelationTypeFromChild: - return c.diffFromChild(ctx, tx, leftID, rightID) + return c.diffFromChild(ctx, tx, leftID, rightID, limit, after) case RelationTypeNotDirect: - return c.diffNonDirect(ctx, tx, leftID, rightID) + return c.diffNonDirect(ctx, tx, leftID, rightID, limit, after) default: c.log.WithFields(logging.Fields{ "relation_type": relation, @@ -92,6 +126,47 @@ func (c *cataloger) doDiffByRelation(ctx context.Context, tx db.Tx, relation Rel } } +func (k contextKey) String() string { + return string(k) +} + +func (c *diffEffectiveCommits) ParentEffectiveCommitByBranchID(branchID int64) CommitID { + for _, l := range c.ParentEffectiveLineage { + if l.BranchID == branchID { + return l.CommitID + } + } + return c.ParentEffectiveCommit +} + +func newDiffResultsBatchWriter(tx db.Tx, tableName string) *diffResultsBatchWriter { + return &diffResultsBatchWriter{ + tx: tx, + DiffResultsTableName: tableName, + Records: make([]*diffResultRecord, 0, diffResultsInsertBatchSize), + } +} + +func (d *diffResultsBatchWriter) Write(r *diffResultRecord) error { + // batch and/or insert results + d.Records = append(d.Records, r) + if len(d.Records) < diffResultsInsertBatchSize { + return nil + } + return d.Flush() +} + +func (d *diffResultsBatchWriter) Flush() error { + if len(d.Records) == 0 { + return nil + } + if err := insertDiffResultsBatch(d.tx, d.DiffResultsTableName, d.Records); err != nil { + return err + } + d.Records = d.Records[:0] + return nil +} + func (c *cataloger) getDiffSummary(ctx context.Context, tx db.Tx) (map[DifferenceType]int, error) { var results []struct { DiffType int `db:"diff_type"` @@ -112,45 +187,128 @@ func (c *cataloger) getDiffSummary(ctx context.Context, tx db.Tx) (map[Differenc return m, nil } -func (c *cataloger) diffFromParent(ctx context.Context, tx db.Tx, parentID, childID int64) error { - // get the last child commit number of the last parent merge - // if there is none - then it is the first merge - var maxChildMerge CommitID - childLineage, err := getLineage(tx, childID, UncommittedID) - if err != nil { - return fmt.Errorf("child lineage failed: %w", err) - } - parentLineage, err := getLineage(tx, parentID, CommittedID) - if err != nil { - return fmt.Errorf("parent lineage failed: %w", err) - } - maxChildQuery, args, err := sq.Select("MAX(commit_id) as max_child_commit"). +func (c *cataloger) diffFromParent(ctx context.Context, tx db.Tx, parentID, childID int64, limit int, after string) error { + // get child last commit of merge from parent + var childLastFromParentCommitID CommitID + query, args, err := psql.Select("MAX(commit_id) as max_child_commit"). From("catalog_commits"). Where("branch_id = ? AND merge_type = 'from_parent'", childID). - PlaceholderFormat(sq.Dollar). ToSql() if err != nil { return fmt.Errorf("get child last commit sql: %w", err) } - err = tx.Get(&maxChildMerge, maxChildQuery, args...) + err = tx.Get(&childLastFromParentCommitID, query, args...) if err != nil { return fmt.Errorf("get child last commit failed: %w", err) } - diffResultsTableName, err := diffResultsTableNameFromContext(ctx) + + diffResultsTableName, err := createDiffResultsTable(ctx, tx) if err != nil { return err } - diffFromParentSQL, args, err := sqDiffFromParentV(parentID, childID, maxChildMerge, parentLineage, childLineage). - Prefix(`CREATE UNLOGGED TABLE ` + diffResultsTableName + " AS "). - PlaceholderFormat(sq.Dollar). - ToSql() + + scannerOpts := DBScannerOptions{ + After: after, + AdditionalFields: []string{DBEntryFieldChecksum}, + } + parentScanner := NewDBLineageScanner(tx, parentID, CommittedID, &scannerOpts) + childScanner := NewDBLineageScanner(tx, childID, UncommittedID, &scannerOpts) + childLineage, err := childScanner.ReadLineage() if err != nil { - return fmt.Errorf("diff from parent sql: %w", err) + return err + } + batch := newDiffResultsBatchWriter(tx, diffResultsTableName) + var childEnt *DBScannerEntry + records := 0 + for parentScanner.Next() { + // is parent element is relevant + parentEnt := parentScanner.Value() + + // get next child entry - scan until we match child's path to parent (or bigger) + childEnt, err = ScanDBEntryUntil(childScanner, childEnt, parentEnt.Path) + if err != nil { + return fmt.Errorf("scan next child element: %w", err) + } + + // point to matched child based on path + var matchedChild *DBScannerEntry + if childEnt != nil && childEnt.Path == parentEnt.Path { + matchedChild = childEnt + } + // diff between entries + diffType := evaluateFromParentElementDiffType(childID, childLastFromParentCommitID, childLineage, parentEnt, matchedChild) + if diffType == DifferenceTypeNone { + continue + } + + diffRec := &diffResultRecord{ + DiffType: diffType, + Entry: parentEnt.Entry, + } + if matchedChild != nil && matchedChild.BranchID == childID && diffType != DifferenceTypeConflict && diffType != DifferenceTypeRemoved { + diffRec.EntryCtid = &parentEnt.RowCtid + } + err = batch.Write(diffRec) + if err != nil { + return err + } + + // stop on limit + records++ + if limit > -1 && records >= limit { + break + } } - if _, err := tx.Exec(diffFromParentSQL, args...); err != nil { - return fmt.Errorf("select diff from parent: %w", err) + if err := parentScanner.Err(); err != nil { + return err } - return nil + return batch.Flush() +} + +func lineageCommitIDByBranchID(lineage []lineageCommit, branchID int64) CommitID { + for _, l := range lineage { + if l.BranchID == branchID { + return l.CommitID + } + } + return UncommittedID +} + +func evaluateFromParentElementDiffType(childBranchID int64, childLastFromParentCommitID CommitID, childLineage []lineageCommit, parentEnt *DBScannerEntry, matchedChild *DBScannerEntry) DifferenceType { + // both deleted - none + if parentEnt.IsDeleted() && (matchedChild == nil || matchedChild.IsDeleted()) { + return DifferenceTypeNone + } + + // same entry - none + if matchedChild != nil && parentEnt.IsDeleted() == matchedChild.IsDeleted() && parentEnt.Checksum == matchedChild.Checksum { + return DifferenceTypeNone + } + + // parent not changed - none + commitIDByChildLineage := lineageCommitIDByBranchID(childLineage, parentEnt.BranchID) + parentChangedAfterChild := parentEnt.ChangedAfterCommit(commitIDByChildLineage) + if !parentChangedAfterChild { + return DifferenceTypeNone + } + + // child entry is uncommitted or updated after merge from parent - conflict + if matchedChild != nil && matchedChild.BranchID == childBranchID && + (!matchedChild.IsCommitted() || matchedChild.ChangedAfterCommit(childLastFromParentCommitID)) { + return DifferenceTypeConflict + } + + // parent deleted - removed + if parentEnt.IsDeleted() { + return DifferenceTypeRemoved + } + + // child delete - add + if matchedChild == nil || matchedChild.IsDeleted() { + return DifferenceTypeAdded + } + // child exists - change + return DifferenceTypeChanged } func getDiffDifferences(ctx context.Context, tx db.Tx, limit int, after string) (Differences, error) { @@ -175,75 +333,194 @@ func getDiffDifferences(ctx context.Context, tx db.Tx, limit int, after string) return result, nil } -func (c *cataloger) diffFromChild(ctx context.Context, tx db.Tx, childID, parentID int64) error { - // read last merge commit numbers from commit table - // if it is the first child-to-parent commit, than those commit numbers are calculated as follows: - // the child is 0, as any change in the child was never merged to the parent. - // the parent is the effective commit number of the first lineage record of the child that points to the parent - // it is possible that the child the have already done from_parent merge. so we have to take the minimal effective commit - effectiveCommits := struct { - ParentEffectiveCommit CommitID `db:"parent_effective_commit"` // last commit parent synchronized with child. If non - it is the commit where the child was branched - ChildEffectiveCommit CommitID `db:"child_effective_commit"` // last commit child synchronized to parent. if never - than it is 1 (everything in the child is a change) - }{} - - effectiveCommitsQuery, args, err := sq.Select(`commit_id AS parent_effective_commit`, `merge_source_commit AS child_effective_commit`). +func (c *cataloger) diffFromChild(ctx context.Context, tx db.Tx, childID, parentID int64, limit int, after string) error { + effectiveCommits, err := c.selectChildEffectiveCommits(tx, childID, parentID) + if err != nil { + return err + } + + diffResultsTableName, err := createDiffResultsTable(ctx, tx) + if err != nil { + return err + } + + scannerOpts := DBScannerOptions{ + After: after, + AdditionalFields: []string{DBEntryFieldChecksum}, + } + childScanner := NewDBBranchScanner(tx, childID, CommittedID, &scannerOpts) + parentScanner := NewDBLineageScanner(tx, parentID, UncommittedID, &scannerOpts) + batch := newDiffResultsBatchWriter(tx, diffResultsTableName) + var parentEnt *DBScannerEntry + records := 0 + for childScanner.Next() { + // is child element is relevant + childEnt := childScanner.Value() + if !childEnt.ChangedAfterCommit(effectiveCommits.ChildEffectiveCommit) { + continue + } + + // get next parent - next parentEnt that path >= child + parentEnt, err = ScanDBEntryUntil(parentScanner, parentEnt, childEnt.Path) + if err != nil { + return fmt.Errorf("scan next parent element: %w", err) + } + + diffType := evaluateFromChildElementDiffType(effectiveCommits, parentID, childEnt, parentEnt) + if diffType == DifferenceTypeNone { + continue + } + + diffRecord := &diffResultRecord{ + DiffType: diffType, + Entry: childEnt.Entry, + EntryCtid: &childEnt.RowCtid, + } + if diffType == DifferenceTypeRemoved && parentEnt != nil { + diffRecord.SourceBranch = parentEnt.BranchID + } + + err = batch.Write(diffRecord) + if err != nil { + return err + } + + // stop on limit + records++ + if limit > -1 && records >= limit { + break + } + } + if err := childScanner.Err(); err != nil { + return err + } + return batch.Flush() +} + +func createDiffResultsTable(ctx context.Context, executor sq.Execer) (string, error) { + diffResultsTableName, err := diffResultsTableNameFromContext(ctx) + if err != nil { + return "", err + } + _, err = executor.Exec("CREATE UNLOGGED TABLE " + diffResultsTableName + ` ( + source_branch bigint NOT NULL, + diff_type integer NOT NULL, + path character varying COLLATE "C" NOT NULL, + entry_ctid tid + )`) + if err != nil { + return "", fmt.Errorf("diff from child diff result table: %w", err) + } + return diffResultsTableName, nil +} + +func evaluateFromChildElementDiffType(effectiveCommits *diffEffectiveCommits, parentBranchID int64, childEnt *DBScannerEntry, parentEnt *DBScannerEntry) DifferenceType { + var matchedParent *DBScannerEntry + if parentEnt != nil && parentEnt.Path == childEnt.Path { + matchedParent = parentEnt + } + // when the entry was deleted + if childEnt.IsDeleted() { + if matchedParent == nil || matchedParent.IsDeleted() { + return DifferenceTypeNone + } + if matchedParent.BranchID == parentBranchID { + // check if parent did any change to the entity + if matchedParent.MinCommit > effectiveCommits.ParentEffectiveCommit { + return DifferenceTypeConflict + } + return DifferenceTypeRemoved + } + // check if the parent saw this entry at the time + if matchedParent.MinCommit >= effectiveCommits.ParentEffectiveCommitByBranchID(matchedParent.BranchID) { + return DifferenceTypeRemoved + } + return DifferenceTypeNone + } + // when the entry was not deleted + if matchedParent != nil { + if matchedParent.IsDeleted() { + return DifferenceTypeAdded + } + if matchedParent.BranchID == parentBranchID { + // check if parent did any change to the entity + if matchedParent.MinCommit > effectiveCommits.ParentEffectiveCommit { + return DifferenceTypeConflict + } + return DifferenceTypeChanged + } + if matchedParent.MinCommit >= effectiveCommits.ParentEffectiveCommitByBranchID(matchedParent.BranchID) { + return DifferenceTypeChanged + } + return DifferenceTypeAdded + } + + return DifferenceTypeAdded +} + +func insertDiffResultsBatch(exerciser sq.Execer, tableName string, batch []*diffResultRecord) error { + if len(batch) == 0 { + return nil + } + ins := psql.Insert(tableName).Columns("source_branch", "diff_type", "path", "entry_ctid") + for _, rec := range batch { + ins = ins.Values(rec.SourceBranch, rec.DiffType, rec.Entry.Path, rec.EntryCtid) + } + query, args, err := ins.ToSql() + if err != nil { + return fmt.Errorf("query for diff results insert: %w", err) + } + _, err = exerciser.Exec(query, args...) + if err != nil { + return fmt.Errorf("insert query diff results: %w", err) + } + return nil +} + +// selectChildEffectiveCommits - read last merge commit numbers from commit table +// if it is the first child-to-parent commit, than those commit numbers are calculated as follows: +// the child is 0, as any change in the child was never merged to the parent. +// the parent is the effective commit number of the first lineage record of the child that points to the parent +// it is possible that the child the have already done from_parent merge. so we have to take the minimal effective commit +func (c *cataloger) selectChildEffectiveCommits(tx db.Tx, childID int64, parentID int64) (*diffEffectiveCommits, error) { + effectiveCommitsQuery, args, err := psql.Select(`commit_id AS parent_effective_commit`, `merge_source_commit AS child_effective_commit`). From("catalog_commits"). Where("branch_id = ? AND merge_source_branch = ? AND merge_type = 'from_child'", parentID, childID). OrderBy(`commit_id DESC`). Limit(1). - PlaceholderFormat(sq.Dollar). ToSql() if err != nil { - return fmt.Errorf("effective commits sql: %w", err) + return nil, err } + var effectiveCommits diffEffectiveCommits err = tx.Get(&effectiveCommits, effectiveCommitsQuery, args...) effectiveCommitsNotFound := errors.Is(err, db.ErrNotFound) if err != nil && !effectiveCommitsNotFound { - return fmt.Errorf("select effective commit: %w", err) + return nil, err } if effectiveCommitsNotFound { effectiveCommits.ChildEffectiveCommit = 1 // we need all commits from the child. so any small number will do parentEffectiveQuery, args, err := psql.Select("commit_id as parent_effective_commit"). From("catalog_commits"). - Where("branch_id = ? AND merge_source_branch = ?", childID, parentID). + Where("branch_id = ? AND merge_source_branch = ? AND merge_type = 'from_parent'", childID, parentID). OrderBy("commit_id desc"). Limit(1). ToSql() if err != nil { - return fmt.Errorf("parent effective commit sql: %w", err) + return nil, err } err = tx.Get(&effectiveCommits.ParentEffectiveCommit, parentEffectiveQuery, args...) if err != nil { - return fmt.Errorf("select parent effective commit: %w", err) + return nil, err } } - parentLineage, err := getLineage(tx, parentID, UncommittedID) - if err != nil { - return fmt.Errorf("parent lineage failed: %w", err) - } - childLineage, err := getLineage(tx, childID, CommittedID) + effectiveLineage, err := getLineage(tx, parentID, effectiveCommits.ParentEffectiveCommit) if err != nil { - return fmt.Errorf("child lineage failed: %w", err) - } - - childLineageValues := getLineageAsValues(childLineage, childID, MaxCommitID) - mainDiffFromChild := sqDiffFromChildV(parentID, childID, effectiveCommits.ParentEffectiveCommit, effectiveCommits.ChildEffectiveCommit, parentLineage, childLineageValues) - diffResultsTableName, err := diffResultsTableNameFromContext(ctx) - if err != nil { - return err - } - diffFromChildSQL, args, err := mainDiffFromChild. - Prefix("CREATE UNLOGGED TABLE " + diffResultsTableName + " AS "). - PlaceholderFormat(sq.Dollar). - ToSql() - if err != nil { - return fmt.Errorf("diff from child sql: %w", err) - } - if _, err := tx.Exec(diffFromChildSQL, args...); err != nil { - return fmt.Errorf("exec diff from child: %w", err) + return nil, err } - return nil + effectiveCommits.ParentEffectiveLineage = effectiveLineage + return &effectiveCommits, nil } // withDiffResultsContext generate diff results id used for temporary table name @@ -270,7 +547,7 @@ func diffResultsTableNameFormat(id string) string { return diffResultsTableNamePrefix + "_" + id } -func (c *cataloger) diffNonDirect(_ context.Context, _ db.Tx, leftID, rightID int64) error { +func (c *cataloger) diffNonDirect(_ context.Context, _ db.Tx, leftID, rightID int64, _ int, _ string) error { c.log.WithFields(logging.Fields{ "left_id": leftID, "right_id": rightID, diff --git a/catalog/cataloger_merge.go b/catalog/cataloger_merge.go index 0e6076fd8d9..df7a2fedbcd 100644 --- a/catalog/cataloger_merge.go +++ b/catalog/cataloger_merge.go @@ -11,6 +11,10 @@ import ( "github.com/treeverse/lakefs/logging" ) +// Merge perform diff between two branches (left and right), apply changes on right branch and commit +// It uses the cataloger diff internal API to produce a temporary table that we delete at the end of a successful merge +// the table holds entry ctid to reference entries in case of changed/added and source branch in case of delete. +// That information is used to address cases where we need to create new entry or tombstone as part of the merge func (c *cataloger) Merge(ctx context.Context, repository, leftBranch, rightBranch, committer, message string, metadata Metadata) (*MergeResult, error) { if err := Validate(ValidateFields{ {Name: "repository", IsValid: ValidateRepositoryName(repository)}, @@ -39,7 +43,7 @@ func (c *cataloger) Merge(ctx context.Context, repository, leftBranch, rightBran return nil, fmt.Errorf("branch relation: %w", err) } - err = c.doDiffByRelation(ctx, tx, relation, leftID, rightID) + err = c.doDiffByRelation(ctx, tx, relation, leftID, rightID, -1, "") if err != nil { return nil, err } @@ -188,6 +192,7 @@ func (c *cataloger) mergeFromChild(ctx context.Context, tx db.Tx, previousMaxCom if err != nil { return err } + // delete(mark max-commit) entries in parent that are either deleted or changed from child _, err = tx.Exec(`UPDATE catalog_entries SET max_commit = $2 WHERE branch_id = $1 AND max_commit = $5 AND path in (SELECT path FROM `+diffResultsTableName+` WHERE diff_type IN ($3,$4))`, diff --git a/catalog/cataloger_merge_test.go b/catalog/cataloger_merge_test.go index f3b301b637f..9ced1cea063 100644 --- a/catalog/cataloger_merge_test.go +++ b/catalog/cataloger_merge_test.go @@ -700,11 +700,16 @@ func TestCataloger_Merge_FromChildNewEntrySameEntry(t *testing.T) { if len(commitLog.Parents) != 2 { t.Fatal("merge commit log should have two parents") } - if diff := deep.Equal(res.Summary, map[DifferenceType]int{}); diff != nil { + + if diff := deep.Equal(res.Summary, map[DifferenceType]int{ + DifferenceTypeChanged: 1, + }); diff != nil { t.Fatal("Merge Summary", diff) } // TODO(barak): enable test after diff between commits is supported - //expectedDifferences = Differences{} + //expectedDifferences := Differences{ + // Difference{Type: DifferenceTypeModified, Path: "/file0"}, + //} //differences, _, err := c.Diff(ctx, repository, commitLog.Parents[0], commitLog.Parents[1], -1, "") //testutil.MustDo(t, "diff merge changes", err) //if !differences.Equal(expectedDifferences) { @@ -1019,8 +1024,24 @@ func TestCataloger_Merge_FromParentThreeBranchesExtended1(t *testing.T) { _, err = c.Merge(ctx, repository, "branch2", "branch1", "tester", "pushing /file111 down", nil) testutil.MustDo(t, "merge branch2 to branch1", err) - _, err = c.Merge(ctx, repository, "branch1", "master", "tester", "pushing /file111 down", nil) - testutil.MustDo(t, "merge branch1 to master", err) + res, err = c.Merge(ctx, repository, "branch1", "master", "tester", "pushing /file111 down", nil) + if !errors.Is(err, ErrConflictFound) { + t.Fatalf("Merge err=%s, expected conflict", err) + } + if res == nil { + t.Fatal("Expected merge result, got none") + } else if res.Reference != "" { + t.Fatalf("Expected empty reference, got %s", res.Reference) + } + if diff := deep.Equal(res.Summary, map[DifferenceType]int{ + DifferenceTypeConflict: 1, + }); diff != nil { + t.Fatal("Merge Summary", diff) + } + + // delete the file to resolve conflict + testutil.MustDo(t, "delete the conflict file on master", + c.DeleteEntry(ctx, repository, "master", "/file111")) // push file111 delete _, err = c.Merge(ctx, repository, "branch1", "branch2", "tester", "delete /file111 up", nil) @@ -1055,30 +1076,6 @@ func TestCataloger_Merge_FromParentThreeBranchesExtended1(t *testing.T) { //if !differences.Equal(expectedDifferences) { // t.Errorf("Merge differences = %s, expected %s", spew.Sdump(differences), spew.Sdump(expectedDifferences)) //} - - res, err = c.Merge(ctx, repository, "branch1", "master", "tester", "try delete /file111 . get conflict", nil) - if !errors.Is(err, ErrConflictFound) { - t.Fatalf("Expected to get conflict error, got err=%+v", err) - } - if res == nil { - t.Fatal("Expected merge result, got none") - } else if res.Reference != "" { - t.Fatalf("Expected empty reference, got %s", res.Reference) - } - if diff := deep.Equal(res.Summary, map[DifferenceType]int{ - DifferenceTypeConflict: 1, - }); diff != nil { - t.Fatal("Merge Summary", diff) - } - // TODO(barak): enable test after diff between commits is supported - //expectedDifferences = Differences{ - // Difference{Type: DifferenceTypeConflict, Path: "/file111"}, - //} - //differences, _, err = c.Diff(ctx, repository, commitLog.Parents[0], commitLog.Parents[1], -1, "") - //testutil.MustDo(t, "diff merge changes", err) - //if !differences.Equal(expectedDifferences) { - // t.Errorf("Merge differences = %s, expected %s", spew.Sdump(differences), spew.Sdump(expectedDifferences)) - //} } func TestCataloger_MergeOverDeletedEntries(t *testing.T) { @@ -1117,7 +1114,7 @@ func TestCataloger_MergeOverDeletedEntries(t *testing.T) { } } -func TestCataloger_MergeWitoutDiff(t *testing.T) { +func TestCataloger_MergeWithoutDiff(t *testing.T) { ctx := context.Background() c := testCataloger(t) // setup a report with 'master' with a single file, and branch 'b1' that started after the file was committed @@ -1128,8 +1125,8 @@ func TestCataloger_MergeWitoutDiff(t *testing.T) { _, err = c.CreateBranch(ctx, repository, "b1", "master") testutil.MustDo(t, "create branch b1", err) _, err = c.Merge(ctx, repository, "master", "b1", "tester", "merge nothing from master to b1", nil) - if err.Error() != "no difference was found" { - t.Fatal("did not get 'nothing to commit' error") + if !errors.Is(err, ErrNoDifferenceWasFound) { + t.Fatalf("Merge expected err=%s, expected=%s", err, ErrNoDifferenceWasFound) } testCatalogerCreateEntry(t, ctx, c, repository, "master", "file_dummy", nil, "master1") _, err = c.Commit(ctx, repository, "master", "file_dummy", "tester", nil) @@ -1143,7 +1140,65 @@ func TestCataloger_MergeWitoutDiff(t *testing.T) { t.Fatalf("error on merge with no changes:%+v", err) } _, err = c.Merge(ctx, repository, "master", "b1", "tester", "merge nothing from master to b1", nil) - if err.Error() != "no difference was found" { - t.Fatal("did not get 'nothing to commit' error") + if !errors.Is(err, ErrNoDifferenceWasFound) { + t.Fatalf("Merge expected err=%s, expected=%s", err, ErrNoDifferenceWasFound) + } +} + +func TestCataloger_MergeFromChildAfterMergeFromParent(t *testing.T) { + t.Skip("Should we trigger conflict when we apply changes on changes we merged from parent") + ctx := context.Background() + c := testCataloger(t) + // setup a report with 'master' with a single file, and branch 'b1' that started after the file was committed + repository := testCatalogerRepo(t, ctx, c, "repository", "master") + // first entry creation + testCatalogerCreateEntry(t, ctx, c, repository, "master", "fileX", nil, "master") + _, err := c.Commit(ctx, repository, "master", "fileX", "tester", nil) + testutil.MustDo(t, "commit file first time on master", err) + _, err = c.CreateBranch(ctx, repository, "b1", "master") + testutil.MustDo(t, "create branch b1", err) + _, err = c.Merge(ctx, repository, "master", "b1", "tester", "merge nothing from master to b1", nil) + if !errors.Is(err, ErrNoDifferenceWasFound) { + t.Fatalf("merge err=%s, expected ErrNoDifferenceWasFound", err) + } + + // PART I + // two entries on master + testCatalogerCreateEntry(t, ctx, c, repository, "master", "fileY", nil, "master1") + testCatalogerCreateEntry(t, ctx, c, repository, "master", "fileZ", nil, "master1") + _, err = c.Commit(ctx, repository, "master", "fileY and fileZ", "tester", nil) + testutil.MustDo(t, "commit fileY master", err) + // merge them into child + _, err = c.Merge(ctx, repository, "master", "b1", "tester", "merge fileY from master to b1", nil) + testutil.MustDo(t, "merge into branch b1", err) + // delete one of those files in b1 + err = c.DeleteEntry(ctx, repository, "b1", "fileY") + testutil.MustDo(t, "delete fileY on b1", err) + testCatalogerCreateEntry(t, ctx, c, repository, "b1", "fileZ", nil, "master1") + _, err = c.Commit(ctx, repository, "b1", "fileY and fileZ", "tester", nil) + testutil.MustDo(t, "commit fileY b1", err) + _, err = c.Merge(ctx, repository, "b1", "master", "tester", "merge nothing from master to b1", nil) + if err != nil { + t.Fatalf("Merge err=%s, expected none", err) + } + + // PART II + // two entries on master + testCatalogerCreateEntry(t, ctx, c, repository, "master", "fileYY", nil, "master1") + testCatalogerCreateEntry(t, ctx, c, repository, "master", "fileZZ", nil, "master1") + _, err = c.Commit(ctx, repository, "master", "fileYY and fileZZ", "tester", nil) + testutil.MustDo(t, "commit fileYY master", err) + // merge them into child + _, err = c.Merge(ctx, repository, "master", "b1", "tester", "merge fileYY from master to b1", nil) + testutil.MustDo(t, "merge into branch b1", err) + // delete one of those files in b1 + err = c.DeleteEntry(ctx, repository, "b1", "fileYY") + testutil.MustDo(t, "delete fileYY on b1", err) + testCatalogerCreateEntry(t, ctx, c, repository, "b1", "fileZZ", nil, "master1") + _, err = c.Commit(ctx, repository, "b1", "fileYY and fileZZ", "tester", nil) + testutil.MustDo(t, "commit fileYY b1", err) + _, err = c.Merge(ctx, repository, "b1", "master", "tester", "merge nothing from master to b1", nil) + if err != nil { + t.Fatalf("Merge err=%s, expected none", err) } } diff --git a/catalog/cataloger_test.go b/catalog/cataloger_test.go index 6d76427bbbc..a92bde062ce 100644 --- a/catalog/cataloger_test.go +++ b/catalog/cataloger_test.go @@ -92,6 +92,7 @@ func testCreateEntryCalcChecksum(key string, seed string) string { } func testVerifyEntries(t testing.TB, ctx context.Context, c Cataloger, repository string, reference string, entries []testEntryInfo) { + t.Helper() for _, entry := range entries { ent, err := c.GetEntry(ctx, repository, reference, entry.Path, GetEntryParams{}) if entry.Deleted { diff --git a/catalog/db_branch_reader.go b/catalog/db_branch_reader.go deleted file mode 100644 index e0defe544ce..00000000000 --- a/catalog/db_branch_reader.go +++ /dev/null @@ -1,80 +0,0 @@ -package catalog - -import ( - "fmt" - - sq "github.com/Masterminds/squirrel" - "github.com/treeverse/lakefs/db" -) - -type DBBranchReader struct { - tx db.Tx - branchID int64 - buf []*DBReaderEntry - bufSize int - idx int - EOF bool - after string - commitID CommitID -} - -func NewDBBranchReader(tx db.Tx, branchID int64, commitID CommitID, bufSize int, after string) *DBBranchReader { - return &DBBranchReader{ - tx: tx, - branchID: branchID, - buf: make([]*DBReaderEntry, 0, bufSize), - bufSize: bufSize, - after: after, - idx: 0, - commitID: commitID, - } -} - -func (r *DBBranchReader) shouldAlignMaxCommit() bool { - return r.commitID != CommittedID && r.commitID != UncommittedID -} - -func (r *DBBranchReader) Next() (*DBReaderEntry, error) { - if r.EOF { - return nil, nil - } - if r.idx >= len(r.buf) { - q := sqBranchReaderSelectWithCommitID(r.branchID, r.commitID).Limit(uint64(r.bufSize)).Where("path > ?", r.after) - sql, args, err := q.PlaceholderFormat(sq.Dollar).ToSql() - if err != nil { - return nil, fmt.Errorf("next query format: %w", err) - } - r.idx = 0 - r.buf = r.buf[:0] - err = r.tx.Select(&r.buf, sql, args...) - if err != nil { - return nil, fmt.Errorf("next select: %w", err) - } - } - if len(r.buf) == 0 { - r.EOF = true - return nil, nil - } - nextPk := r.buf[r.idx] - r.idx++ - // if entry was deleted after the max commit that can be read, it must be set to undeleted - if r.shouldAlignMaxCommit() && nextPk.MaxCommit >= r.commitID { - nextPk.MaxCommit = MaxCommitID - } - r.after = nextPk.Path - return nextPk, nil -} - -func sqBranchReaderSelectWithCommitID(branchID int64, commitID CommitID) sq.SelectBuilder { - q := sq.Select("branch_id", "path", "min_commit", "max_commit", "ctid"). - Distinct().Options(" ON (branch_id,path)"). - From("catalog_entries"). - Where("branch_id = ?", branchID). - OrderBy("branch_id", "path", "min_commit desc") - if commitID == CommittedID { - q = q.Where("min_commit < ?", MaxCommitID) - } else if commitID > 0 { - q = q.Where("min_commit between 1 and ?", commitID) - } - return q -} diff --git a/catalog/db_branch_reader_test.go b/catalog/db_branch_reader_test.go deleted file mode 100644 index 902ea2da9f6..00000000000 --- a/catalog/db_branch_reader_test.go +++ /dev/null @@ -1,62 +0,0 @@ -package catalog - -import ( - "context" - "strconv" - "testing" - - "github.com/treeverse/lakefs/db" - "github.com/treeverse/lakefs/testutil" -) - -func TestCataloger_DBBranchReader(t *testing.T) { - const numberOfObjects = 100 - ctx := context.Background() - conn, uri := testutil.GetDB(t, databaseURI) - defer conn.Close() - c := TestCataloger{Cataloger: NewCataloger(conn), DbConnURI: uri} - baseBranchName := "b0" - repository := testCatalogerRepo(t, ctx, c, "repo", baseBranchName) - objSkip := []int{1, 2, 3, 5, 7, 11} - bufferSizes := []int{1, 2, 8, 64, 512, 1024 * 4} - maxBranchNumber := len(objSkip) - - testSetupDBReaderData(t, ctx, c, repository, numberOfObjects, maxBranchNumber, baseBranchName, objSkip) - - _, _ = conn.Transact(func(tx db.Tx) (interface{}, error) { - var repoID int64 - testutil.MustDo(t, "get repository id", - tx.Get(&repoID, `SELECT id FROM catalog_repositories WHERE name=$1`, repository)) - - // test different cache sizes - for k := 0; k < len(bufferSizes); k++ { - bufSize := bufferSizes[k] - // test single branch reader - for branchNo := 0; branchNo < maxBranchNumber; branchNo++ { - branchName := "b" + strconv.Itoa(branchNo) - var branchID int64 - testutil.MustDo(t, "get branch id", - tx.Get(&branchID, `SELECT id FROM catalog_branches WHERE repository_id=$1 AND name=$2`, repoID, branchName)) - branchReader := NewDBBranchReader(tx, branchID, UncommittedID, bufSize, "") - objSkipNo := objSkip[branchNo] - for i := 0; ; i += objSkipNo { - o, err := branchReader.Next() - testutil.MustDo(t, "read from branch "+branchName, err) - if o == nil { - if !(i-objSkipNo < numberOfObjects && i >= numberOfObjects) { - t.Fatalf("terminated at i=%d", i) - } - break - } else { - objNum, err := strconv.Atoi(o.Path[4:]) - testutil.MustDo(t, "convert obj number "+o.Path, err) - if objNum != i { - t.Errorf("objNum=%d, i=%d\n", objNum, i) - } - } - } - } - } - return nil, nil - }) -} diff --git a/catalog/db_branch_scanner.go b/catalog/db_branch_scanner.go new file mode 100644 index 00000000000..10efd1f7839 --- /dev/null +++ b/catalog/db_branch_scanner.go @@ -0,0 +1,126 @@ +package catalog + +import ( + sq "github.com/Masterminds/squirrel" + "github.com/treeverse/lakefs/db" +) + +const DBScannerDefaultBufferSize = 1024 + +type DBBranchScanner struct { + opts DBScannerOptions + tx db.Tx + branchID int64 + commitID CommitID + buf []*DBScannerEntry + idx int + after string + ended bool + err error + value *DBScannerEntry +} + +func NewDBBranchScanner(tx db.Tx, branchID int64, commitID CommitID, opts *DBScannerOptions) *DBBranchScanner { + s := &DBBranchScanner{ + tx: tx, + branchID: branchID, + idx: 0, + commitID: commitID, + } + if opts != nil { + s.opts = *opts + s.after = opts.After + } + if s.opts.BufferSize == 0 { + s.opts.BufferSize = DBScannerDefaultBufferSize + } + s.buf = make([]*DBScannerEntry, 0, s.opts.BufferSize) + return s +} + +func (s *DBBranchScanner) Next() bool { + if s.hasEnded() { + return false + } + if !s.readBufferIfNeeded() { + return false + } + s.value = s.buf[s.idx] + // if entry was deleted after the max commit that can be read, it must be set to undeleted + if s.shouldAlignMaxCommit() && s.value.MaxCommit >= s.commitID { + s.value.MaxCommit = MaxCommitID + } + s.after = s.value.Path + s.idx++ + return true +} + +func (s *DBBranchScanner) Err() error { + return s.err +} + +func (s *DBBranchScanner) Value() *DBScannerEntry { + if s.hasEnded() { + return nil + } + return s.value +} + +func (s *DBBranchScanner) hasEnded() bool { + return s.ended || s.err != nil +} + +func (s *DBBranchScanner) shouldAlignMaxCommit() bool { + return s.commitID != CommittedID && s.commitID != UncommittedID +} + +func (s *DBBranchScanner) readBufferIfNeeded() bool { + if s.idx < len(s.buf) { + return true + } + // start fresh + s.idx = 0 + s.buf = s.buf[:0] + // query entries + var query string + var args []interface{} + q := s.buildQuery() + query, args, s.err = q.PlaceholderFormat(sq.Dollar).ToSql() + if s.err != nil { + return false + } + s.err = s.tx.Select(&s.buf, query, args...) + if s.err != nil { + return false + } + // mark iterator ended if no results + if len(s.buf) == 0 { + s.ended = true + return false + } + return true +} + +func (s *DBBranchScanner) buildQuery() sq.SelectBuilder { + q := sq.Select("branch_id", "path", "min_commit", "max_commit", "ctid"). + Distinct().Options(" ON (branch_id,path)"). + From("catalog_entries"). + Where("branch_id = ?", s.branchID). + OrderBy("branch_id", "path", "min_commit desc"). + Limit(uint64(s.opts.BufferSize)) + if s.after != "" { + q = q.Where("path > ?", s.after) + } + if s.commitID == CommittedID { + q = q.Where("min_commit < ?", MaxCommitID) + } else if s.commitID > 0 { + q = q.Where("min_commit between 1 and ?", s.commitID) + } + if len(s.opts.AdditionalFields) > 0 { + q = q.Columns(s.opts.AdditionalFields...) + } + if s.opts.AdditionalWhere != nil { + q = q.Where(s.opts.AdditionalWhere) + } + return q +} diff --git a/catalog/db_branch_scanner_test.go b/catalog/db_branch_scanner_test.go new file mode 100644 index 00000000000..0e8c2c83551 --- /dev/null +++ b/catalog/db_branch_scanner_test.go @@ -0,0 +1,147 @@ +package catalog + +import ( + "context" + "fmt" + "strconv" + "testing" + + sq "github.com/Masterminds/squirrel" + + "github.com/davecgh/go-spew/spew" + "github.com/treeverse/lakefs/db" + "github.com/treeverse/lakefs/testutil" +) + +func TestDBBranchScanner(t *testing.T) { + const numberOfObjects = 100 + ctx := context.Background() + conn, uri := testutil.GetDB(t, databaseURI) + defer func() { _ = conn.Close() }() + c := TestCataloger{Cataloger: NewCataloger(conn), DbConnURI: uri} + + const baseBranchName = "b0" + repository := testCatalogerRepo(t, ctx, c, "repo", baseBranchName) + + // get branch ID once + var branchID int64 + _, _ = conn.Transact(func(tx db.Tx) (interface{}, error) { + var err error + branchID, err = getBranchID(tx, repository, "b0", LockTypeNone) + testutil.MustDo(t, "get branch id", err) + return nil, nil + }) + + t.Run("empty", func(t *testing.T) { + _, _ = conn.Transact(func(tx db.Tx) (interface{}, error) { + scanner := NewDBBranchScanner(tx, branchID, UncommittedID, &DBScannerOptions{BufferSize: 64}) + firstNext := scanner.Next() + if firstNext { + t.Fatalf("first entry should be false") + } + if scanner.Err() != nil { + t.Fatalf("first entry should not get error, got=%s", scanner.Err()) + } + v := scanner.Value() + if v != nil { + t.Fatalf("Scanner Value=%+v, expected nil", v) + } + return nil, nil + }) + }) + + objSkip := []int{1, 2, 3, 5, 7, 11} + testSetupDBScannerData(t, ctx, c, repository, numberOfObjects, baseBranchName, objSkip) + + t.Run("additional_fields", func(t *testing.T) { + _, _ = conn.Transact(func(tx db.Tx) (interface{}, error) { + scanner := NewDBBranchScanner(tx, branchID, UncommittedID, &DBScannerOptions{ + AdditionalFields: []string{"checksum", "physical_address"}, + }) + testedSomething := false + for scanner.Next() { + ent := scanner.Value() + if ent.Checksum == "" { + t.Fatalf("Entry missing value for Checksum: %s", spew.Sdump(ent)) + } + if ent.PhysicalAddress == "" { + t.Fatalf("Entry missing value for PhysicalAddress: %s", spew.Sdump(ent)) + } + if !testedSomething { + testedSomething = true + } + } + testutil.MustDo(t, "read from branch for additional fields", scanner.Err()) + if !testedSomething { + t.Fatal("Not tested something with additional fields") + } + return nil, nil + }) + }) + + t.Run("additional_where", func(t *testing.T) { + _, _ = conn.Transact(func(tx db.Tx) (interface{}, error) { + p := fmt.Sprintf("Obj-%04d", numberOfObjects-5) + scanner := NewDBBranchScanner(tx, branchID, UncommittedID, &DBScannerOptions{ + AdditionalWhere: sq.Expr("path=?", p), + }) + var ent *DBScannerEntry + for scanner.Next() { + ent = scanner.Value() + } + testutil.MustDo(t, "read from branch for additional fields", scanner.Err()) + if ent == nil { + t.Fatal("missing entry") + } else if ent.Path != p { + t.Fatalf("Read entry with path=%s, expected=%s", ent.Path, p) + } + return nil, nil + }) + }) + + t.Run("buffer_sizes", func(t *testing.T) { + bufferSizes := []int{1, 2, 3, 8, 19, 64, 512} + for _, bufSize := range bufferSizes { + for branchNo, objSkipNo := range objSkip { + branchName := "b" + strconv.Itoa(branchNo) + _, _ = conn.Transact(func(tx db.Tx) (interface{}, error) { + branchID, err := getBranchID(tx, repository, branchName, LockTypeNone) + testutil.MustDo(t, "get branch ID", err) + scanner := NewDBBranchScanner(tx, branchID, UncommittedID, &DBScannerOptions{BufferSize: bufSize}) + i := 0 + for scanner.Next() { + o := scanner.Value() + + objNum, err := strconv.Atoi(o.Path[4:]) + testutil.MustDo(t, "convert obj number "+o.Path, err) + if objNum != i { + t.Errorf("objNum=%d, i=%d", objNum, i) + } + + i += objSkipNo + } + testutil.MustDo(t, "read from branch "+branchName, scanner.Err()) + if !(i-objSkipNo < numberOfObjects && i >= numberOfObjects) { + t.Fatalf("terminated at i=%d", i) + } + return nil, nil + }) + } + } + }) +} + +func testSetupDBScannerData(t *testing.T, ctx context.Context, c TestCataloger, repository string, numberOfObjects int, baseBranchName string, objSkip []int) { + for branchNo, skipCount := range objSkip { + branchName := "b" + strconv.Itoa(branchNo) + if branchNo > 0 { + testCatalogerBranch(t, ctx, c, repository, branchName, baseBranchName) + } + for i := 0; i < numberOfObjects; i += skipCount { + testCatalogerCreateEntry(t, ctx, c, repository, branchName, fmt.Sprintf("Obj-%04d", i), nil, "") + } + _, err := c.Commit(ctx, repository, branchName, "commit to "+branchName, "tester", nil) + testutil.MustDo(t, "commit to "+branchName, err) + baseBranchName = branchName + } +} diff --git a/catalog/db_lineage_reader.go b/catalog/db_lineage_reader.go deleted file mode 100644 index d47c9eaff25..00000000000 --- a/catalog/db_lineage_reader.go +++ /dev/null @@ -1,94 +0,0 @@ -package catalog - -import ( - "fmt" - - "github.com/treeverse/lakefs/db" -) - -type DBLineageReader struct { - tx db.Tx - branchID int64 - EOF bool - commitID CommitID - readers []*DBBranchReader - nextRow []*DBReaderEntry - firstTime bool - limit int - returnedRows int -} - -const ( - DBReaderMaxLimit = 1000 -) - -func NewDBLineageReader(tx db.Tx, branchID int64, commitID CommitID, bufSize int, limit int, after string) (*DBLineageReader, error) { - lineage, err := getLineage(tx, branchID, commitID) - if err != nil { - return nil, fmt.Errorf("error getting lineage: %w", err) - } - if limit > DBReaderMaxLimit { - limit = DBReaderMaxLimit - } - lr := &DBLineageReader{ - tx: tx, - branchID: branchID, - commitID: commitID, - firstTime: true, - readers: make([]*DBBranchReader, len(lineage)+1), - limit: limit, - } - lr.readers[0] = NewDBBranchReader(tx, branchID, commitID, bufSize, after) - for i, bl := range lineage { - lr.readers[i+1] = NewDBBranchReader(tx, bl.BranchID, bl.CommitID, bufSize, after) - } - lr.nextRow = make([]*DBReaderEntry, len(lr.readers)) - for i, reader := range lr.readers { - e, err := reader.Next() - if err != nil { - return nil, fmt.Errorf("getting entry from branch ID %d: %w", reader.branchID, err) - } - lr.nextRow[i] = e - } - return lr, nil -} - -func (r *DBLineageReader) Next() (*DBReaderEntry, error) { - if r.EOF { - return nil, nil - } - var selectedEntry *DBReaderEntry - // indirection array, to skip lineage branches that reached end - nonNilNextRow := make([]int, 0, len(r.nextRow)) - for i, ent := range r.nextRow { - if ent != nil { - nonNilNextRow = append(nonNilNextRow, i) - } - } - if len(nonNilNextRow) == 0 { - r.EOF = true - return nil, nil - } - // find lowest Path - selectedEntry = r.nextRow[nonNilNextRow[0]] - for i := 1; i < len(nonNilNextRow); i++ { - if selectedEntry.Path > r.nextRow[nonNilNextRow[i]].Path { - selectedEntry = r.nextRow[nonNilNextRow[i]] - } - } - r.returnedRows++ - if r.limit >= 0 && r.returnedRows >= r.limit { - r.EOF = true - } - // advance next row for all branches that have this Path - for i := 0; i < len(nonNilNextRow); i++ { - if r.nextRow[nonNilNextRow[i]].Path == selectedEntry.Path { - n, err := r.readers[nonNilNextRow[i]].Next() - if err != nil { - return nil, fmt.Errorf("error getting entry on branch : %w", err) - } - r.nextRow[nonNilNextRow[i]] = n - } - } - return selectedEntry, nil -} diff --git a/catalog/db_lineage_reader_test.go b/catalog/db_lineage_reader_test.go deleted file mode 100644 index 59644651b0a..00000000000 --- a/catalog/db_lineage_reader_test.go +++ /dev/null @@ -1,250 +0,0 @@ -package catalog - -import ( - "context" - "fmt" - "strconv" - "testing" - - "github.com/treeverse/lakefs/db" - "github.com/treeverse/lakefs/testutil" -) - -func TestCataloger_DBLineageReader(t *testing.T) { - const numberOfObjects = 10 - - ctx := context.Background() - conn, uri := testutil.GetDB(t, databaseURI) - defer conn.Close() - c := TestCataloger{Cataloger: NewCataloger(conn), DbConnURI: uri} - baseBranchName := "b0" - repository := testCatalogerRepo(t, ctx, c, "repo", baseBranchName) - objSkip := []int{1, 2, 3, 5, 7, 11} - bufferSizes := []int{1, 2, 8, 64, 512, 1024 * 4} - maxBranchNumber := len(objSkip) - - testSetupDBReaderData(t, ctx, c, repository, numberOfObjects, maxBranchNumber, baseBranchName, objSkip) - - t.Run("cache_sizes", func(t *testing.T) { - _, _ = conn.Transact(func(tx db.Tx) (interface{}, error) { - // test different cache sizes - for k := 0; k < len(bufferSizes); k++ { - bufSize := bufferSizes[k] - // test lineage reader - for branchNo := 0; branchNo < maxBranchNumber; branchNo++ { - branchName := "b" + strconv.Itoa(branchNo) - lineageReader, err := NewDBLineageReader(tx, int64(branchNo+2), UncommittedID, bufSize, -1, "") - testutil.MustDo(t, "new lineage reader "+branchName, err) - for i := 0; i < numberOfObjects; i++ { - var expectedBranch int64 - o, err := lineageReader.Next() - testutil.MustDo(t, "read from lineage "+branchName, err) - if o == nil { - t.Errorf("Got nil obj, branch=%s, number=%d", branchName, i) - } - // check item read from might branch - for j := branchNo; j >= 0; j-- { - if i%objSkip[j] == 0 { - expectedBranch = int64(j + 2) - break - } - } - if o.BranchID != expectedBranch { - t.Errorf("fetch from wrong branch.branchName=%s branchNumber=%d, i =%d\n", branchName, o.BranchID, i) - } - } - } - } - return nil, nil - }) - }) - - // test reading committed and uncommitted data - bufSize := 8 - t.Run("uncommitted", func(t *testing.T) { - testCatalogerCreateEntry(t, ctx, c, repository, "b1", "Obj-0004", nil, "sd1") - _, _ = conn.Transact(func(tx db.Tx) (interface{}, error) { - lineageReaderB1U, err := NewDBLineageReader(tx, 3, UncommittedID, bufSize, -1, "Obj-0003") - testutil.MustDo(t, "new lineage reader branchID="+strconv.Itoa(3), err) - lineageReaderB1C, err := NewDBLineageReader(tx, 3, CommittedID, bufSize, -1, "Obj-0003") - testutil.MustDo(t, "new lineage reader branchID="+strconv.Itoa(3), err) - lineageReaderB2U, err := NewDBLineageReader(tx, 4, UncommittedID, bufSize, -1, "Obj-0003") - testutil.MustDo(t, "new lineage reader branchID="+strconv.Itoa(4), err) - lineageReaderB2C, err := NewDBLineageReader(tx, 4, CommittedID, bufSize, -1, "Obj-0003") - testutil.MustDo(t, "new lineage reader branchID="+strconv.Itoa(4), err) - testDBReaderNext(t, lineageReaderB1U, "read 0004 lineage b1 U ", 3, MinCommitUncommittedIndicator, MaxCommitID) - testDBReaderNext(t, lineageReaderB2U, "read 0004 lineage b2 U ", 3, 5, MaxCommitID) - testDBReaderNext(t, lineageReaderB1C, "read 0004 lineage b1 C ", 3, 5, MaxCommitID) - testDBReaderNext(t, lineageReaderB2C, "read 0004 lineage b2 C ", 3, 5, MaxCommitID) - return nil, nil - }) - }) - t.Run("committed", func(t *testing.T) { - _, err := c.Commit(ctx, repository, "b1", "commit to b1", "tester", nil) - testutil.MustDo(t, "commit to b1", err) - _, _ = conn.Transact(func(tx db.Tx) (interface{}, error) { - lineageReaderB1U, err := NewDBLineageReader(tx, 3, UncommittedID, bufSize, -1, "Obj-0003") - testutil.MustDo(t, "new lineage reader branchID="+strconv.Itoa(3), err) - lineageReaderB1C, err := NewDBLineageReader(tx, 3, CommittedID, bufSize, -1, "Obj-0003") - testutil.MustDo(t, "new lineage reader branchID="+strconv.Itoa(3), err) - lineageReaderB2U, err := NewDBLineageReader(tx, 4, UncommittedID, bufSize, -1, "Obj-0003") - testutil.MustDo(t, "new lineage reader branchID="+strconv.Itoa(4), err) - lineageReaderB2C, err := NewDBLineageReader(tx, 4, CommittedID, bufSize, -1, "Obj-0003") - testutil.MustDo(t, "new lineage reader branchID="+strconv.Itoa(4), err) - testDBReaderNext(t, lineageReaderB1U, "read 0004 lineage b1 U ", 3, 14, MaxCommitID) - testDBReaderNext(t, lineageReaderB1C, "read 0004 lineage b1 C ", 3, 14, MaxCommitID) - testDBReaderNext(t, lineageReaderB2U, "read 0004 lineage b2 U ", 3, 5, MaxCommitID) - testDBReaderNext(t, lineageReaderB2C, "read 0004 lineage b2 C ", 3, 5, MaxCommitID) - return nil, nil - }) - }) - - t.Run("merged", func(t *testing.T) { - _, err := c.Merge(ctx, repository, "b1", "b2", "tester", "", nil) - testutil.MustDo(t, "merge b1 into b2", err) - _, _ = conn.Transact(func(tx db.Tx) (interface{}, error) { - lineageReaderB2U, err := NewDBLineageReader(tx, 4, UncommittedID, bufSize, -1, "Obj-0003") - testutil.MustDo(t, "new lineage reader branchID="+strconv.Itoa(4), err) - lineageReaderB2C, err := NewDBLineageReader(tx, 4, CommittedID, bufSize, -1, "Obj-0003") - testutil.MustDo(t, "new lineage reader branchID="+strconv.Itoa(4), err) - testDBReaderNext(t, lineageReaderB2U, "read 0004 lineage b1 U ", 3, 14, MaxCommitID) - testDBReaderNext(t, lineageReaderB2C, "read 0004 lineage b1 U ", 3, 14, MaxCommitID) - return nil, nil - }) - }) - - t.Run("delete_uncommitted", func(t *testing.T) { - testutil.MustDo(t, "delete committed file on b1", - c.DeleteEntry(ctx, repository, "b1", "Obj-0004")) - _, _ = conn.Transact(func(tx db.Tx) (interface{}, error) { - lineageReaderB1U, err := NewDBLineageReader(tx, 3, UncommittedID, bufSize, -1, "Obj-0003") - testutil.MustDo(t, "new lineage reader branchID="+strconv.Itoa(3), err) - lineageReaderB1C, err := NewDBLineageReader(tx, 3, CommittedID, bufSize, -1, "Obj-0003") - testutil.MustDo(t, "new lineage reader branchID="+strconv.Itoa(3), err) - lineageReaderB2U, err := NewDBLineageReader(tx, 4, UncommittedID, bufSize, -1, "Obj-0003") - testutil.MustDo(t, "new lineage reader branchID="+strconv.Itoa(4), err) - lineageReaderB2C, err := NewDBLineageReader(tx, 4, CommittedID, bufSize, -1, "Obj-0003") - testutil.MustDo(t, "new lineage reader branchID="+strconv.Itoa(4), err) - testDBReaderNext(t, lineageReaderB1U, "read 0004 lineage b1 U ", 3, MinCommitUncommittedIndicator, 0) - testDBReaderNext(t, lineageReaderB1C, "read 0004 lineage b1 C ", 3, 14, MaxCommitID) - testDBReaderNext(t, lineageReaderB2U, "read 0004 lineage b2 U ", 3, 14, MaxCommitID) - testDBReaderNext(t, lineageReaderB2C, "read 0004 lineage b2 C ", 3, 14, MaxCommitID) - return nil, nil - }) - }) - - t.Run("delete_committed", func(t *testing.T) { - _, err := c.Commit(ctx, repository, "b1", "commit to b1", "tester", nil) - testutil.MustDo(t, "commit to b1", err) - _, _ = conn.Transact(func(tx db.Tx) (interface{}, error) { - lineageReaderB1U, err := NewDBLineageReader(tx, 3, UncommittedID, bufSize, -1, "Obj-0003") - testutil.MustDo(t, "new lineage reader branchID="+strconv.Itoa(3), err) - lineageReaderB1C, err := NewDBLineageReader(tx, 3, CommittedID, bufSize, -1, "Obj-0003") - testutil.MustDo(t, "new lineage reader branchID="+strconv.Itoa(3), err) - lineageReaderB2U, err := NewDBLineageReader(tx, 4, UncommittedID, bufSize, -1, "Obj-0003") - testutil.MustDo(t, "new lineage reader branchID="+strconv.Itoa(4), err) - lineageReaderB2C, err := NewDBLineageReader(tx, 4, CommittedID, bufSize, -1, "Obj-0003") - testutil.MustDo(t, "new lineage reader branchID="+strconv.Itoa(4), err) - testDBReaderNext(t, lineageReaderB1U, "read 0004 lineage b1 U ", 3, 14, 14) - testDBReaderNext(t, lineageReaderB1C, "read 0004 lineage b1 C ", 3, 14, 14) - testDBReaderNext(t, lineageReaderB2U, "read 0004 lineage b2 U ", 3, 14, MaxCommitID) - testDBReaderNext(t, lineageReaderB2C, "read 0004 lineage b2 C ", 3, 14, MaxCommitID) - return nil, nil - }) - }) - - t.Run("merge", func(t *testing.T) { - _, err := c.Merge(ctx, repository, "b1", "b2", "tester", "", nil) - testutil.MustDo(t, "merge b1 into b2", err) - _, _ = conn.Transact(func(tx db.Tx) (interface{}, error) { - lineageReaderB2U, err := NewDBLineageReader(tx, 4, UncommittedID, bufSize, -1, "Obj-0003") - testutil.MustDo(t, "new lineage reader branchID="+strconv.Itoa(4), err) - lineageReaderB2C, err := NewDBLineageReader(tx, 4, CommittedID, bufSize, -1, "Obj-0003") - testutil.MustDo(t, "new lineage reader branchID="+strconv.Itoa(4), err) - testDBReaderNext(t, lineageReaderB2U, "read 0004 lineage b2 U ", 3, 14, 14) - testDBReaderNext(t, lineageReaderB2C, "read 0004 lineage b2 C ", 3, 14, 14) - return nil, nil - }) - }) - - t.Run("merge", func(t *testing.T) { - testCatalogerCreateEntry(t, ctx, c, repository, "b1", "Obj-0004", nil, "sd2") - _, err := c.Commit(ctx, repository, "b1", "commit to b1", "tester", nil) - testutil.MustDo(t, "commit to b1", err) - _, err = c.Merge(ctx, repository, "b1", "b2", "tester", "", nil) - testutil.MustDo(t, "merge b1 into b2", err) - testutil.MustDo(t, "delete committed file on b2", - c.DeleteEntry(ctx, repository, "b2", "Obj-0004")) - _, _ = conn.Transact(func(tx db.Tx) (interface{}, error) { - lineageReaderB2U, err := NewDBLineageReader(tx, 4, UncommittedID, bufSize, -1, "Obj-0003") - testutil.MustDo(t, "new lineage reader branchID="+strconv.Itoa(4), err) - lineageReaderB2C, err := NewDBLineageReader(tx, 4, CommittedID, bufSize, -1, "Obj-0003") - testutil.MustDo(t, "new lineage reader branchID="+strconv.Itoa(4), err) - testDBReaderNext(t, lineageReaderB2U, "read 0004 lineage b2 U ", 4, MinCommitUncommittedIndicator, 0) - testDBReaderNext(t, lineageReaderB2C, "read 0004 lineage b2 C ", 3, 18, MaxCommitID) - return nil, nil - }) - }) - - t.Run("merge", func(t *testing.T) { - _, err := c.Commit(ctx, repository, "b2", "commit to b2", "tester", nil) - testutil.MustDo(t, "commit to b1", err) - _, _ = conn.Transact(func(tx db.Tx) (interface{}, error) { - lineageReaderB2U, err := NewDBLineageReader(tx, 4, UncommittedID, bufSize, -1, "Obj-0003") - testutil.MustDo(t, "new lineage reader branchID="+strconv.Itoa(4), err) - lineageReaderB2C, err := NewDBLineageReader(tx, 4, CommittedID, bufSize, -1, "Obj-0003") - testutil.MustDo(t, "new lineage reader branchID="+strconv.Itoa(4), err) - testDBReaderNext(t, lineageReaderB2U, "read 0004 lineage b2 U ", 4, 20, 0) - testDBReaderNext(t, lineageReaderB2C, "read 0004 lineage b2 C ", 4, 20, 0) - return nil, nil - }) - }) - - t.Run("merge", func(t *testing.T) { - testCatalogerCreateEntry(t, ctx, c, repository, "b0", "Obj-00041", nil, "sd4") - _, err := c.Commit(ctx, repository, "b0", "commit to b0", "tester", nil) - testutil.MustDo(t, "commit to b0", err) - _, err = c.Merge(ctx, repository, "b0", "b1", "tester", "", nil) - testutil.MustDo(t, "merge b0 into b1", err) - _, err = c.Merge(ctx, repository, "b1", "b2", "tester", "", nil) - testutil.MustDo(t, "merge b1 into b2", err) - - testCatalogerCreateEntry(t, ctx, c, repository, "b0", "Obj-0004", nil, "sd3") - _, err = c.Commit(ctx, repository, "b0", "commit to b0", "tester", nil) - testutil.MustDo(t, "commit to b0", err) - _, err = c.Merge(ctx, repository, "b0", "b1", "tester", "", nil) - testutil.MustDo(t, "merge b0 into b1", err) - _, err = c.Merge(ctx, repository, "b1", "b2", "tester", "", nil) - testutil.MustDo(t, "merge b1 into b2", err) - _, _ = conn.Transact(func(tx db.Tx) (interface{}, error) { - lineageReaderB2C, err := NewDBLineageReader(tx, 4, CommittedID, bufSize, -1, "Obj-0003") - testutil.MustDo(t, "new lineage reader branchID="+strconv.Itoa(4), err) - testDBReaderNext(t, lineageReaderB2C, "read 0004 lineage b2 C ", 4, 26, MaxCommitID) - return nil, nil - }) - }) -} - -func testSetupDBReaderData(t *testing.T, ctx context.Context, c TestCataloger, repository string, numberOfObjects int, maxBranchNumber int, baseBranchName string, objSkip []int) { - for branchNo := 0; branchNo < maxBranchNumber; branchNo++ { - branchName := "b" + strconv.Itoa(branchNo) - if branchNo > 0 { - testCatalogerBranch(t, ctx, c, repository, branchName, baseBranchName) - } - for i := 0; i < numberOfObjects; i += objSkip[branchNo] { - testCatalogerCreateEntry(t, ctx, c, repository, branchName, fmt.Sprintf("Obj-%04d", i), nil, "") - } - _, err := c.Commit(ctx, repository, branchName, "commit to "+branchName, "tester", nil) - testutil.MustDo(t, "commit to "+branchName, err) - baseBranchName = branchName - } -} - -func testDBReaderNext(t *testing.T, lReader *DBLineageReader, msg string, expBranch, expMinCommit CommitID, expMaxCommit CommitID) { - t.Helper() - o, err := lReader.Next() - testutil.MustDo(t, msg, err) - if o.BranchID != int64(expBranch) || o.MinCommit != CommitID(expMinCommit) || o.MaxCommit != expMaxCommit { - t.Errorf("%s branch=%d (%d), min_commit=%d (%d), max_commit=%d (%d)", msg, o.BranchID, expBranch, o.MinCommit, expMinCommit, o.MaxCommit, expMaxCommit) - } -} diff --git a/catalog/db_lineage_scanner.go b/catalog/db_lineage_scanner.go new file mode 100644 index 00000000000..e163cc95010 --- /dev/null +++ b/catalog/db_lineage_scanner.go @@ -0,0 +1,116 @@ +package catalog + +import ( + "fmt" + + "github.com/treeverse/lakefs/db" +) + +type DBLineageScanner struct { + tx db.Tx + branchID int64 + commitID CommitID + scanners []*DBBranchScanner + ended bool + err error + value *DBScannerEntry + opts DBScannerOptions +} + +func NewDBLineageScanner(tx db.Tx, branchID int64, commitID CommitID, opts *DBScannerOptions) *DBLineageScanner { + s := &DBLineageScanner{ + tx: tx, + branchID: branchID, + commitID: commitID, + } + if opts != nil { + s.opts = *opts + } + return s +} + +func (s *DBLineageScanner) Next() bool { + if s.ended { + return false + } + if !s.ensureBranchScanners() { + return false + } + + // select lowest entry based on path + var selectedEntry *DBScannerEntry + for _, scanner := range s.scanners { + ent := scanner.Value() + if ent != nil && (selectedEntry == nil || selectedEntry.Path > ent.Path) { + selectedEntry = ent + } + } + s.value = selectedEntry + + // mark scanner ended if no new entry was selected + if s.value == nil { + s.ended = true + return false + } + + // move scanners to next item, in case they point to the same path as current element + for _, scanner := range s.scanners { + ent := scanner.Value() + if ent == nil || ent.Path != s.value.Path { + continue + } + if scanner.Next() { + continue + } + if err := scanner.Err(); err != nil { + s.err = fmt.Errorf("getting entry on branch: %w", err) + return false + } + } + return true +} + +func (s *DBLineageScanner) Err() error { + return s.err +} + +func (s *DBLineageScanner) Value() *DBScannerEntry { + if s.hasEnded() { + return nil + } + return s.value +} + +func (s *DBLineageScanner) hasEnded() bool { + return s.ended || s.err != nil +} + +func (s *DBLineageScanner) ReadLineage() ([]lineageCommit, error) { + return getLineage(s.tx, s.branchID, s.commitID) +} + +func (s *DBLineageScanner) ensureBranchScanners() bool { + if s.scanners != nil { + return true + } + lineage, err := s.ReadLineage() + if err != nil { + s.err = fmt.Errorf("getting lineage: %w", err) + return false + } + s.scanners = make([]*DBBranchScanner, len(lineage)+1) + s.scanners[0] = NewDBBranchScanner(s.tx, s.branchID, s.commitID, &s.opts) + for i, bl := range lineage { + s.scanners[i+1] = NewDBBranchScanner(s.tx, bl.BranchID, bl.CommitID, &s.opts) + } + for _, branchScanner := range s.scanners { + if branchScanner.Next() { + continue + } + if err := branchScanner.Err(); err != nil { + s.err = fmt.Errorf("getting entry from branch ID %d: %w", branchScanner.branchID, err) + return false + } + } + return true +} diff --git a/catalog/db_lineage_scanner_test.go b/catalog/db_lineage_scanner_test.go new file mode 100644 index 00000000000..0a8cbf34ad3 --- /dev/null +++ b/catalog/db_lineage_scanner_test.go @@ -0,0 +1,222 @@ +package catalog + +import ( + "context" + "strconv" + "testing" + + "github.com/treeverse/lakefs/db" + "github.com/treeverse/lakefs/testutil" +) + +func TestDBLineageScanner(t *testing.T) { + const numberOfObjects = 16 + + ctx := context.Background() + conn, uri := testutil.GetDB(t, databaseURI) + defer func() { _ = conn.Close() }() + + c := TestCataloger{Cataloger: NewCataloger(conn), DbConnURI: uri} + baseBranchName := "b0" + repository := testCatalogerRepo(t, ctx, c, "repo", baseBranchName) + + objSkip := []int{1, 2, 3, 5, 7, 11} + testSetupDBScannerData(t, ctx, c, repository, numberOfObjects, baseBranchName, objSkip) + + // bufferSizes - keep buffer size samples less and more then numberOfObjects + bufferSizes := []int{1, 2, 3, 8, 19, 64, 512} + for _, bufSize := range bufferSizes { + _, _ = conn.Transact(func(tx db.Tx) (interface{}, error) { + // test lineage scanner + for branchNo := range objSkip { + branchName := "b" + strconv.Itoa(branchNo) + branchID, err := getBranchID(tx, repository, branchName, LockTypeNone) + testutil.MustDo(t, "get branch id", err) + scanner := NewDBLineageScanner(tx, branchID, UncommittedID, &DBScannerOptions{BufferSize: bufSize}) + for i := 0; scanner.Next(); i++ { + o := scanner.Value() + if o == nil { + t.Fatal("Value() return nil, expected value") + } + // check item read from the right branch + var expectedBranch int64 + for j := branchNo; j >= 0; j-- { + if i%objSkip[j] == 0 { + expectedBranch = int64(j + 2) + break + } + } + if o.BranchID != expectedBranch { + t.Fatalf("Read entry with branchID=%d, expected=%d (branch=%s, number=%d)", + o.BranchID, expectedBranch, branchName, i) + } + } + v := scanner.Value() + if v != nil { + t.Fatalf("Value() returned %+v, expected nil after iteration", v) + } + testutil.MustDo(t, "next from lineage scanner", scanner.Err()) + } + return nil, nil + }) + } + + // get branch IDs once + var b1BranchID, b2BranchID int64 + _, _ = conn.Transact(func(tx db.Tx) (interface{}, error) { + var err error + b1BranchID, err = getBranchID(tx, repository, "b1", LockTypeNone) + testutil.MustDo(t, "get branch id", err) + b2BranchID, err = getBranchID(tx, repository, "b2", LockTypeNone) + testutil.MustDo(t, "get branch id", err) + return nil, nil + }) + + // test reading committed and uncommitted data + const bufSize = 8 + scannerOpts := &DBScannerOptions{BufferSize: bufSize, After: "Obj-0003"} + testCatalogerCreateEntry(t, ctx, c, repository, "b1", "Obj-0004", nil, "sd1") + _, _ = conn.Transact(func(tx db.Tx) (interface{}, error) { + lineageScannerB1U := NewDBLineageScanner(tx, b1BranchID, UncommittedID, scannerOpts) + lineageScannerB1C := NewDBLineageScanner(tx, b1BranchID, CommittedID, scannerOpts) + lineageScannerB2U := NewDBLineageScanner(tx, b2BranchID, UncommittedID, scannerOpts) + lineageScannerB2C := NewDBLineageScanner(tx, b2BranchID, CommittedID, scannerOpts) + + const expectedMinCommit = 5 + testDBScannerNext(t, lineageScannerB1U, "read 0004 lineage b1 U", b1BranchID, MinCommitUncommittedIndicator, MaxCommitID) + testDBScannerNext(t, lineageScannerB2U, "read 0004 lineage b2 U", b1BranchID, expectedMinCommit, MaxCommitID) + testDBScannerNext(t, lineageScannerB1C, "read 0004 lineage b1 C", b1BranchID, expectedMinCommit, MaxCommitID) + testDBScannerNext(t, lineageScannerB2C, "read 0004 lineage b2 C", b1BranchID, expectedMinCommit, MaxCommitID) + return nil, nil + }) + + _, err := c.Commit(ctx, repository, "b1", "commit to b1", "tester", nil) + testutil.MustDo(t, "commit to b1", err) + _, _ = conn.Transact(func(tx db.Tx) (interface{}, error) { + lineageScannerB1U := NewDBLineageScanner(tx, b1BranchID, UncommittedID, scannerOpts) + lineageScannerB1C := NewDBLineageScanner(tx, b1BranchID, CommittedID, scannerOpts) + lineageScannerB2U := NewDBLineageScanner(tx, b2BranchID, UncommittedID, scannerOpts) + lineageScannerB2C := NewDBLineageScanner(tx, b2BranchID, CommittedID, scannerOpts) + testDBScannerNext(t, lineageScannerB1U, "read 0004 lineage b1 U", b1BranchID, 14, MaxCommitID) + testDBScannerNext(t, lineageScannerB1C, "read 0004 lineage b1 C", b1BranchID, 14, MaxCommitID) + testDBScannerNext(t, lineageScannerB2U, "read 0004 lineage b2 U", b1BranchID, 5, MaxCommitID) + testDBScannerNext(t, lineageScannerB2C, "read 0004 lineage b2 C", b1BranchID, 5, MaxCommitID) + return nil, nil + }) + + _, err = c.Merge(ctx, repository, "b1", "b2", "tester", "", nil) + testutil.MustDo(t, "merge b1 into b2", err) + _, _ = conn.Transact(func(tx db.Tx) (interface{}, error) { + lineageScannerB2U := NewDBLineageScanner(tx, b2BranchID, UncommittedID, scannerOpts) + lineageScannerB2C := NewDBLineageScanner(tx, b2BranchID, CommittedID, scannerOpts) + testDBScannerNext(t, lineageScannerB2U, "read 0004 lineage b1 U", b1BranchID, 14, MaxCommitID) + testDBScannerNext(t, lineageScannerB2C, "read 0004 lineage b1 U", b1BranchID, 14, MaxCommitID) + return nil, nil + }) + + testutil.MustDo(t, "delete committed file on b1", + c.DeleteEntry(ctx, repository, "b1", "Obj-0004")) + _, _ = conn.Transact(func(tx db.Tx) (interface{}, error) { + lineageScannerB1U := NewDBLineageScanner(tx, b1BranchID, UncommittedID, scannerOpts) + lineageScannerB1C := NewDBLineageScanner(tx, b1BranchID, CommittedID, scannerOpts) + lineageScannerB2U := NewDBLineageScanner(tx, b2BranchID, UncommittedID, scannerOpts) + lineageScannerB2C := NewDBLineageScanner(tx, b2BranchID, CommittedID, scannerOpts) + testDBScannerNext(t, lineageScannerB1U, "read 0004 lineage b1 U", b1BranchID, MinCommitUncommittedIndicator, 0) + testDBScannerNext(t, lineageScannerB1C, "read 0004 lineage b1 C", b1BranchID, 14, MaxCommitID) + testDBScannerNext(t, lineageScannerB2U, "read 0004 lineage b2 U", b1BranchID, 14, MaxCommitID) + testDBScannerNext(t, lineageScannerB2C, "read 0004 lineage b2 C", b1BranchID, 14, MaxCommitID) + return nil, nil + }) + + _, err = c.Commit(ctx, repository, "b1", "commit to b1", "tester", nil) + testutil.MustDo(t, "commit to b1", err) + _, _ = conn.Transact(func(tx db.Tx) (interface{}, error) { + lineageScannerB1U := NewDBLineageScanner(tx, b1BranchID, UncommittedID, scannerOpts) + lineageScannerB1C := NewDBLineageScanner(tx, b1BranchID, CommittedID, scannerOpts) + lineageScannerB2U := NewDBLineageScanner(tx, b2BranchID, UncommittedID, scannerOpts) + lineageScannerB2C := NewDBLineageScanner(tx, b2BranchID, CommittedID, scannerOpts) + testDBScannerNext(t, lineageScannerB1U, "read 0004 lineage b1 U", b1BranchID, 14, 14) + testDBScannerNext(t, lineageScannerB1C, "read 0004 lineage b1 C", b1BranchID, 14, 14) + testDBScannerNext(t, lineageScannerB2U, "read 0004 lineage b2 U", b1BranchID, 14, MaxCommitID) + testDBScannerNext(t, lineageScannerB2C, "read 0004 lineage b2 C", b1BranchID, 14, MaxCommitID) + return nil, nil + }) + + _, err = c.Merge(ctx, repository, "b1", "b2", "tester", "", nil) + testutil.MustDo(t, "merge b1 into b2", err) + _, _ = conn.Transact(func(tx db.Tx) (interface{}, error) { + lineageScannerB2U := NewDBLineageScanner(tx, b2BranchID, UncommittedID, scannerOpts) + lineageScannerB2C := NewDBLineageScanner(tx, b2BranchID, CommittedID, scannerOpts) + testDBScannerNext(t, lineageScannerB2U, "read 0004 lineage b2 U", b1BranchID, 14, 14) + testDBScannerNext(t, lineageScannerB2C, "read 0004 lineage b2 C", b1BranchID, 14, 14) + return nil, nil + }) + + testCatalogerCreateEntry(t, ctx, c, repository, "b1", "Obj-0004", nil, "sd2") + _, err = c.Commit(ctx, repository, "b1", "commit to b1", "tester", nil) + testutil.MustDo(t, "commit to b1", err) + _, err = c.Merge(ctx, repository, "b1", "b2", "tester", "", nil) + testutil.MustDo(t, "merge b1 into b2", err) + testutil.MustDo(t, "delete committed file on b2", + c.DeleteEntry(ctx, repository, "b2", "Obj-0004")) + _, _ = conn.Transact(func(tx db.Tx) (interface{}, error) { + lineageScannerB2U := NewDBLineageScanner(tx, b2BranchID, UncommittedID, scannerOpts) + lineageScannerB2C := NewDBLineageScanner(tx, b2BranchID, CommittedID, scannerOpts) + testDBScannerNext(t, lineageScannerB2U, "read 0004 lineage b2 U", b2BranchID, MinCommitUncommittedIndicator, 0) + testDBScannerNext(t, lineageScannerB2C, "read 0004 lineage b2 C", b1BranchID, 18, MaxCommitID) + return nil, nil + }) + + _, err = c.Commit(ctx, repository, "b2", "commit to b2", "tester", nil) + testutil.MustDo(t, "commit to b1", err) + _, _ = conn.Transact(func(tx db.Tx) (interface{}, error) { + lineageScannerB2U := NewDBLineageScanner(tx, b2BranchID, UncommittedID, scannerOpts) + lineageScannerB2C := NewDBLineageScanner(tx, b2BranchID, CommittedID, scannerOpts) + testDBScannerNext(t, lineageScannerB2U, "read 0004 lineage b2 U", b2BranchID, 20, 0) + testDBScannerNext(t, lineageScannerB2C, "read 0004 lineage b2 C", b2BranchID, 20, 0) + return nil, nil + }) + + testCatalogerCreateEntry(t, ctx, c, repository, "b0", "Obj-00041", nil, "sd4") + _, err = c.Commit(ctx, repository, "b0", "commit to b0", "tester", nil) + testutil.MustDo(t, "commit to b0", err) + _, err = c.Merge(ctx, repository, "b0", "b1", "tester", "", nil) + testutil.MustDo(t, "merge b0 into b1", err) + _, err = c.Merge(ctx, repository, "b1", "b2", "tester", "", nil) + testutil.MustDo(t, "merge b1 into b2", err) + + testCatalogerCreateEntry(t, ctx, c, repository, "b0", "Obj-0004", nil, "sd3") + _, err = c.Commit(ctx, repository, "b0", "commit to b0", "tester", nil) + testutil.MustDo(t, "commit to b0", err) + _, err = c.Merge(ctx, repository, "b0", "b1", "tester", "", nil) + testutil.MustDo(t, "merge b0 into b1", err) + _, err = c.Merge(ctx, repository, "b1", "b2", "tester", "", nil) + testutil.MustDo(t, "merge b1 into b2", err) + _, _ = conn.Transact(func(tx db.Tx) (interface{}, error) { + lineageScannerB2C := NewDBLineageScanner(tx, b2BranchID, CommittedID, scannerOpts) + testDBScannerNext(t, lineageScannerB2C, "read 0004 lineage b2 C", b2BranchID, 26, MaxCommitID) + return nil, nil + }) +} + +func testDBScannerNext(t *testing.T, scanner *DBLineageScanner, msg string, expBranch int64, expMinCommit CommitID, expMaxCommit CommitID) { + t.Helper() + if !scanner.Next() { + testutil.MustDo(t, msg, scanner.Err()) + return + } + + o := scanner.Value() + if o.BranchID != expBranch { + t.Fatalf("%s branch=%d, expected=%d", + msg, o.BranchID, expBranch) + } + if o.MinCommit != expMinCommit || o.MaxCommit != expMaxCommit { + t.Fatalf("%s min_commit=%d, expected=%d", + msg, o.MinCommit, expMinCommit) + } + if o.MaxCommit != expMaxCommit { + t.Fatalf("%s branch=%d (%d), min_commit=%d (%d), max_commit=%d (%d)", + msg, o.BranchID, expBranch, o.MinCommit, expMinCommit, o.MaxCommit, expMaxCommit) + } +} diff --git a/catalog/db_scanner.go b/catalog/db_scanner.go new file mode 100644 index 00000000000..2ef292fa1b6 --- /dev/null +++ b/catalog/db_scanner.go @@ -0,0 +1,26 @@ +package catalog + +import sq "github.com/Masterminds/squirrel" + +type DBScannerOptions struct { + BufferSize int + After string + AdditionalFields []string + AdditionalWhere sq.Sqlizer +} + +type DBScanner interface { + Next() bool + Value() *DBScannerEntry + Err() error +} + +func ScanDBEntryUntil(s DBScanner, ent *DBScannerEntry, p string) (*DBScannerEntry, error) { + for ent == nil || ent.Path < p { + if !s.Next() { + return nil, s.Err() + } + ent = s.Value() + } + return ent, nil +} diff --git a/catalog/diff.go b/catalog/diff.go index c78e0654885..a94e309afa4 100644 --- a/catalog/diff.go +++ b/catalog/diff.go @@ -7,6 +7,7 @@ const ( DifferenceTypeRemoved DifferenceTypeChanged DifferenceTypeConflict + DifferenceTypeNone ) type Difference struct { diff --git a/catalog/model.go b/catalog/model.go index 32061c50ac1..305ee534940 100644 --- a/catalog/model.go +++ b/catalog/model.go @@ -6,6 +6,10 @@ import ( "time" ) +const ( + DBEntryFieldChecksum = "checksum" +) + type Metadata map[string]string type Repository struct { @@ -88,11 +92,11 @@ func (j *Metadata) Scan(src interface{}) error { return json.Unmarshal(data, j) } -type DBReaderEntry struct { +type DBScannerEntry struct { BranchID int64 `db:"branch_id"` - Path string `db:"path"` + RowCtid string `db:"ctid"` MinMaxCommit - RowCtid string `db:"ctid"` + Entry } type MinMaxCommit struct { @@ -113,7 +117,7 @@ func (m MinMaxCommit) IsCommitted() bool { func (m MinMaxCommit) ChangedAfterCommit(commitID CommitID) bool { // needed for diff, to check if an entry changed after the lineage commit id - return m.MinCommit > commitID || (m.MaxCommit != MaxCommitID && m.MaxCommit > commitID) + return m.MinCommit > commitID || (m.IsDeleted() && m.MaxCommit >= commitID) } type entryPathPrefixInfo struct { diff --git a/catalog/views.go b/catalog/views.go index 8819403c5de..df18c562f18 100644 --- a/catalog/views.go +++ b/catalog/views.go @@ -96,100 +96,3 @@ func sqEntriesLineageV(branchID int64, requestedCommit CommitID, lineage []linea Column(maxCommitAlias).Column(isDeletedAlias) return baseSelect } - -func sqDiffFromChildV(parentID, childID int64, parentEffectiveCommit, childEffectiveCommit CommitID, parentUncommittedLineage []lineageCommit, childLineageValues string) sq.SelectBuilder { - lineage := sqEntriesLineage(parentID, UncommittedID, parentUncommittedLineage) - sqParent := sq.Select("*"). - FromSelect(lineage, "z"). - Where("displayed_branch = ?", parentID) - // Can diff with expired files, just not usefully! - fromChildInternalQ := sq.Select("s.path", - "s.is_deleted AS DifferenceTypeRemoved", - "f.path IS NOT NULL AND NOT f.is_deleted AS DifferenceTypeChanged", - "COALESCE(f.is_deleted, true) AND s.is_deleted AS both_deleted", - "f.path IS NOT NULL AND (f.physical_address = s.physical_address AND f.is_deleted = s.is_deleted) AS same_object", - "s.entry_ctid", - "f.source_branch", - ). - // Conflict detection - Column(`-- parent either created or deleted after last merge - conflict - f.path IS NOT NULL AND ( NOT f.is_committed OR -- uncommitted entries always new - (f.source_branch = ? AND -- it is the parent branch - not from lineage - ( f.min_commit > ? OR -- created after last merge - (f.max_commit >= ? AND f.is_deleted))) -- deleted after last merge - OR (f.source_branch != ? AND -- an entry from parent lineage - -- negative proof - if the child could see this object - than this is NOT a conflict - -- done by examining the child lineage against the parent object - NOT EXISTS ( SELECT * FROM`+childLineageValues+` WHERE - l.branch_id = f.source_branch AND - -- prove that ancestor entry was observable by the child - (l.commit_id >= f.min_commit AND - (l.commit_id > f.max_commit OR NOT f.is_deleted)) - ))) - AS DifferenceTypeConflict`, parentID, parentEffectiveCommit, parentEffectiveCommit, parentID). - FromSelect(sqEntriesV(CommittedID).Distinct(). - Options(" on (branch_id,path)"). - OrderBy("branch_id", "path", "min_commit desc"). - Where("branch_id = ? AND (min_commit >= ? OR max_commit >= ? and is_deleted)", childID, childEffectiveCommit, childEffectiveCommit), "s"). - JoinClause(sqParent.Prefix("LEFT JOIN (").Suffix(") AS f ON f.path = s.path")) - RemoveNonRelevantQ := sq.Select("*").FromSelect(fromChildInternalQ, "t").Where("NOT (same_object OR both_deleted)") - return sq.Select(). - Column(sq.Alias(sq.Case().When("DifferenceTypeConflict", "3"). - When("DifferenceTypeRemoved", "1"). - When("DifferenceTypeChanged", "2"). - Else("0"), "diff_type")). - Column("path").Column(sq.Alias(sq.Case(). - When("NOT(DifferenceTypeConflict OR DifferenceTypeRemoved)", "entry_ctid"). - Else("NULL"), - "entry_ctid")). - Column("source_branch"). - FromSelect(RemoveNonRelevantQ, "t1") -} - -func sqDiffFromParentV(parentID, childID int64, lastChildMergeWithParent CommitID, parentCommittedLineage, childUncommittedLineage []lineageCommit) sq.SelectBuilder { - childLineageValues := getLineageAsValues(childUncommittedLineage, childID, MaxCommitID) - childLineage := sqEntriesLineage(childID, UncommittedID, childUncommittedLineage) - sqChild := sq.Select("*"). - FromSelect(childLineage, "s"). - Where("displayed_branch = ?", childID) - - parentLineage := sqEntriesLineage(parentID, CommittedID, parentCommittedLineage) - // Can diff with expired files, just not usefully! - internalV := sq.Select("f.path", - "f.entry_ctid", - "f.is_deleted AS DifferenceTypeRemoved", - "s.path IS NOT NULL AND NOT s.is_deleted AS DifferenceTypeChanged", - "COALESCE(s.is_deleted, true) AND f.is_deleted AS both_deleted", - // both point to same object, and have the same deletion status - "s.path IS NOT NULL AND f.physical_address = s.physical_address AND f.is_deleted = s.is_deleted AS same_object"). - Column(`f.min_commit > l.commit_id -- parent created after commit - OR f.max_commit >= l.commit_id AND f.is_deleted -- parent deleted after commit - AS parent_changed`). // parent was changed if child could no "see" it - // this happens if min_commit is larger than the lineage commit - // or entry deletion max_commit is larger or equal than lineage commit - Column("s.path IS NOT NULL AND s.source_branch = ? as entry_in_child", childID). - Column(`s.path IS NOT NULL AND s.source_branch = ? AND - (NOT s.is_committed -- uncommitted is new - OR s.min_commit > ? -- created after last merge - OR (s.max_commit >= ? AND s.is_deleted)) -- deleted after last merge - AS DifferenceTypeConflict`, childID, lastChildMergeWithParent, lastChildMergeWithParent). - FromSelect(parentLineage, "f"). - Where("f.displayed_branch = ?", parentID). - JoinClause(sqChild.Prefix("LEFT JOIN (").Suffix(") AS s ON f.path = s.path")). - Join(`(SELECT * FROM ` + childLineageValues + `) l ON f.source_branch = l.branch_id`) - - RemoveNonRelevantQ := sq.Select("*"). - FromSelect(internalV, "t"). - Where("parent_changed AND NOT (same_object OR both_deleted)") - - return sq.Select(). - Column(sq.Alias(sq.Case().When("DifferenceTypeConflict", "3"). - When("DifferenceTypeRemoved", "1"). - When("DifferenceTypeChanged", "2"). - Else("0"), "diff_type")). - Column("path"). - Column(sq.Alias(sq.Case(). - When("entry_in_child AND NOT (DifferenceTypeRemoved OR DifferenceTypeConflict)", "entry_ctid"). - Else("NULL"), "entry_ctid")). - FromSelect(RemoveNonRelevantQ, "t1") -}