diff --git a/catalog/cataloger_diff.go b/catalog/cataloger_diff.go index e028fdf25ff..0031cff4062 100644 --- a/catalog/cataloger_diff.go +++ b/catalog/cataloger_diff.go @@ -206,7 +206,7 @@ func (c *cataloger) diffFromChild(ctx context.Context, tx db.Tx, childID, parent parentEffectiveQuery, args, err := psql.Select("commit_id as parent_effective_commit"). From("catalog_commits"). Where("branch_id = ? AND merge_source_branch = ?", childID, parentID). - OrderBy("commit_id"). + OrderBy("commit_id desc"). Limit(1). ToSql() if err != nil { diff --git a/catalog/db_batch_entry_read.go b/catalog/db_batch_entry_read.go index 5e57ee7e440..fae283394bd 100644 --- a/catalog/db_batch_entry_read.go +++ b/catalog/db_batch_entry_read.go @@ -123,25 +123,20 @@ func (c *cataloger) readEntriesBatch(wg *sync.WaitGroup, inputBatchChan chan bat func (c *cataloger) dbSelectBatchEntries(repository string, ref Ref, pathReqList []pathRequest) ([]*Entry, error) { res, err := c.db.Transact(func(tx db.Tx) (interface{}, error) { - // get branch branchID, err := c.getBranchIDCache(tx, repository, ref.Branch) if err != nil { return nil, err } - // get lineage - lineage, err := getLineage(tx, branchID, ref.CommitID) - if err != nil { - return nil, fmt.Errorf("get lineage: %w", err) - } // prepare list of paths p := make([]string, len(pathReqList)) for i, s := range pathReqList { p[i] = s.path } // prepare query - readExpr := sq.Select("path", "physical_address", "creation_date", "size", "checksum", "metadata", "is_expired"). - FromSelect(sqEntriesLineage(branchID, ref.CommitID, lineage), "entries"). - Where(sq.And{sq.Eq{"path": p}, sq.Expr("not is_deleted")}) + readExpr, err := sqEntryLineageSelect(tx, branchID, ref.CommitID, true, p) + if err != nil { + return nil, fmt.Errorf("lineage select: %w", err) + } query, args, err := readExpr.PlaceholderFormat(sq.Dollar).ToSql() if err != nil { return nil, fmt.Errorf("build sql: %w", err) diff --git a/catalog/db_read_entry.go b/catalog/db_read_entry.go new file mode 100644 index 00000000000..136803c8532 --- /dev/null +++ b/catalog/db_read_entry.go @@ -0,0 +1,69 @@ +package catalog + +import ( + "strconv" + + sq "github.com/Masterminds/squirrel" + "github.com/treeverse/lakefs/db" +) + +// sqEntryLineageSelect select path/s from a branch, including lineage +// 1. Union all the branches in the lineage. +// 2. if multiple branches have this path - select the one closest to the requested branch +// 3. filterDeleted param = true will remove results that were deleted +func sqEntryLineageSelect(tx db.Tx, branchID int64, commitID CommitID, filterDeleted bool, paths []string) (sq.SelectBuilder, error) { + lineage, err := getLineage(tx, branchID, commitID) + if err != nil { + return sq.SelectBuilder{}, err + } + unionSelect := sqEntryBranchSelect(branchID, commitID, paths).Column("? as lineage_order", "0"). + Prefix("(").Suffix(")") + for i, branch := range lineage { + unionSelect = unionSelect.SuffixExpr(sq.ConcatExpr( + " UNION ALL (", + sqEntryBranchSelect(branch.BranchID, branch.CommitID, paths).Column("? as lineage_order", strconv.Itoa(i+1)), + ")")) + } + distinctSelect := sq.Select("*"). + FromSelect(unionSelect, "c"). + Distinct().Options("ON (path)"). + OrderBy("path", "lineage_order") + finalSelect := sq.Select("path", "physical_address", "creation_date", "size", "checksum", "metadata", "is_expired"). + FromSelect(distinctSelect, "t") + if filterDeleted { + finalSelect = finalSelect.Where("max_commit = ?", MaxCommitID) + } + return finalSelect, nil +} + +// sqEntryBranchSelect select path/s from a single branch. +// 1. Get the requested commit +// 2. If a path has multiple versions in various commits - Return the row with highest min commit +// 3. If the version was deleted after the requested commit - the row max-commit will be set to uncommitted +func sqEntryBranchSelect(branchID int64, commitID CommitID, paths []string) sq.SelectBuilder { + rawSelect := sq.Select("path", "physical_address", "creation_date", "size", "checksum", "metadata", "is_expired"). + Distinct().Options("ON (branch_id,path)"). + From("catalog_entries"). + Where("branch_id = ?", branchID). + OrderBy("branch_id", "path", "min_commit desc") + l := len(paths) + // the case of single path is handled differently than multiple pathes because my tests showed there is a significant + // difference between (about 25%) between "where path = X" and "where path in (X)". even though the optimization is + // the same. may be a driver issue in the client application + if l == 1 { + rawSelect = rawSelect.Where("path = ?", paths[0]) + } else { + rawSelect = rawSelect.Where(sq.Eq{"path": paths}) + } + switch commitID { + case CommittedID: + rawSelect = rawSelect.Where("min_commit < ?", MaxCommitID). + Column("max_commit") + case UncommittedID: + rawSelect = rawSelect.Column("max_commit") + default: + rawSelect = rawSelect.Where("min_commit between 1 and ?", commitID). + Column("CASE WHEN max_commit >= ? THEN ? ELSE max_commit END AS max_commit", commitID, MaxCommitID) + } + return rawSelect +} diff --git a/catalog/views.go b/catalog/views.go index b1a08676a85..8819403c5de 100644 --- a/catalog/views.go +++ b/catalog/views.go @@ -146,14 +146,14 @@ func sqDiffFromChildV(parentID, childID int64, parentEffectiveCommit, childEffec FromSelect(RemoveNonRelevantQ, "t1") } -func sqDiffFromParentV(parentID, childID int64, lastChildMergeWithParent CommitID, parentUncommittedLineage, childUncommittedLineage []lineageCommit) sq.SelectBuilder { +func sqDiffFromParentV(parentID, childID int64, lastChildMergeWithParent CommitID, parentCommittedLineage, childUncommittedLineage []lineageCommit) sq.SelectBuilder { childLineageValues := getLineageAsValues(childUncommittedLineage, childID, MaxCommitID) childLineage := sqEntriesLineage(childID, UncommittedID, childUncommittedLineage) sqChild := sq.Select("*"). FromSelect(childLineage, "s"). Where("displayed_branch = ?", childID) - parentLineage := sqEntriesLineage(parentID, CommittedID, parentUncommittedLineage) + parentLineage := sqEntriesLineage(parentID, CommittedID, parentCommittedLineage) // Can diff with expired files, just not usefully! internalV := sq.Select("f.path", "f.entry_ctid",