Skip to content

Commit

Permalink
use unlogged table for cataloger diff results (#685)
Browse files Browse the repository at this point in the history
  • Loading branch information
nopcoder authored Oct 1, 2020
1 parent b7f48e0 commit 0f4fb7f
Show file tree
Hide file tree
Showing 2 changed files with 95 additions and 29 deletions.
89 changes: 72 additions & 17 deletions catalog/cataloger_diff.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,29 @@ import (
"context"
"errors"
"fmt"
"strings"

sq "github.com/Masterminds/squirrel"
"github.com/google/uuid"
"github.com/treeverse/lakefs/db"
"github.com/treeverse/lakefs/logging"
)

type contextKey string

func (k contextKey) String() string {
return string(k)
}

const (
DiffMaxLimit = 1000

diffResultsTableName = "catalog_diff_results"
diffResultsTableNamePrefix = "catalog_diff_results"
contextDiffResultsKey contextKey = "diff_results_key"
)

var ErrMissingDiffResultsIDInContext = errors.New("missing diff results id in context")

func (c *cataloger) Diff(ctx context.Context, repository string, leftBranch string, rightBranch string, limit int, after string) (Differences, bool, error) {
if err := Validate(ValidateFields{
{Name: "repository", IsValid: ValidateRepositoryName(repository)},
Expand All @@ -28,6 +39,10 @@ func (c *cataloger) Diff(ctx context.Context, repository string, leftBranch stri
if limit < 0 || limit > DiffMaxLimit {
limit = DiffMaxLimit
}

ctx, cancel := c.withDiffResultsContext(ctx)
defer cancel()

res, err := c.db.Transact(func(tx db.Tx) (interface{}, error) {
leftID, err := c.getBranchIDCache(tx, repository, leftBranch)
if err != nil {
Expand All @@ -37,11 +52,11 @@ func (c *cataloger) Diff(ctx context.Context, repository string, leftBranch stri
if err != nil {
return nil, fmt.Errorf("right branch: %w", err)
}
err = c.doDiff(tx, leftID, rightID)
err = c.doDiff(ctx, tx, leftID, rightID)
if err != nil {
return nil, err
}
return getDiffDifferences(tx, limit+1, after)
return getDiffDifferences(ctx, tx, limit+1, after)
}, c.txOpts(ctx)...)
if err != nil {
return nil, false, err
Expand All @@ -51,22 +66,22 @@ func (c *cataloger) Diff(ctx context.Context, repository string, leftBranch stri
return differences, hasMore, nil
}

func (c *cataloger) doDiff(tx db.Tx, leftID, rightID int64) error {
func (c *cataloger) doDiff(ctx context.Context, tx db.Tx, leftID, rightID int64) error {
relation, err := getBranchesRelationType(tx, leftID, rightID)
if err != nil {
return err
}
return c.doDiffByRelation(tx, relation, leftID, rightID)
return c.doDiffByRelation(ctx, tx, relation, leftID, rightID)
}

func (c *cataloger) doDiffByRelation(tx db.Tx, relation RelationType, leftID, rightID int64) error {
func (c *cataloger) doDiffByRelation(ctx context.Context, tx db.Tx, relation RelationType, leftID, rightID int64) error {
switch relation {
case RelationTypeFromParent:
return c.diffFromParent(tx, leftID, rightID)
return c.diffFromParent(ctx, tx, leftID, rightID)
case RelationTypeFromChild:
return c.diffFromChild(tx, leftID, rightID)
return c.diffFromChild(ctx, tx, leftID, rightID)
case RelationTypeNotDirect:
return c.diffNonDirect(tx, leftID, rightID)
return c.diffNonDirect(ctx, tx, leftID, rightID)
default:
c.log.WithFields(logging.Fields{
"relation_type": relation,
Expand All @@ -77,12 +92,16 @@ func (c *cataloger) doDiffByRelation(tx db.Tx, relation RelationType, leftID, ri
}
}

func (c *cataloger) getDiffSummary(tx db.Tx) (map[DifferenceType]int, error) {
func (c *cataloger) getDiffSummary(ctx context.Context, tx db.Tx) (map[DifferenceType]int, error) {
var results []struct {
DiffType int `db:"diff_type"`
Count int `db:"count"`
}
err := tx.Select(&results, "SELECT diff_type, count(diff_type) as count FROM "+diffResultsTableName+" GROUP BY diff_type")
diffResultsTableName, err := diffResultsTableNameFromContext(ctx)
if err != nil {
return nil, err
}
err = tx.Select(&results, "SELECT diff_type, count(diff_type) as count FROM "+diffResultsTableName+" GROUP BY diff_type")
if err != nil {
return nil, fmt.Errorf("count diff resutls by type: %w", err)
}
Expand All @@ -93,7 +112,7 @@ func (c *cataloger) getDiffSummary(tx db.Tx) (map[DifferenceType]int, error) {
return m, nil
}

func (c *cataloger) diffFromParent(tx db.Tx, parentID, childID int64) error {
func (c *cataloger) diffFromParent(ctx context.Context, tx db.Tx, parentID, childID int64) error {
// get the last child commit number of the last parent merge
// if there is none - then it is the first merge
var maxChildMerge CommitID
Expand All @@ -117,8 +136,12 @@ func (c *cataloger) diffFromParent(tx db.Tx, parentID, childID int64) error {
if err != nil {
return fmt.Errorf("get child last commit failed: %w", err)
}
diffResultsTableName, err := diffResultsTableNameFromContext(ctx)
if err != nil {
return err
}
diffFromParentSQL, args, err := sqDiffFromParentV(parentID, childID, maxChildMerge, parentLineage, childLineage).
Prefix(`CREATE TEMP TABLE ` + diffResultsTableName + " ON COMMIT DROP AS ").
Prefix(`CREATE UNLOGGED TABLE ` + diffResultsTableName + " AS ").
PlaceholderFormat(sq.Dollar).
ToSql()
if err != nil {
Expand All @@ -130,7 +153,11 @@ func (c *cataloger) diffFromParent(tx db.Tx, parentID, childID int64) error {
return nil
}

func getDiffDifferences(tx db.Tx, limit int, after string) (Differences, error) {
func getDiffDifferences(ctx context.Context, tx db.Tx, limit int, after string) (Differences, error) {
diffResultsTableName, err := diffResultsTableNameFromContext(ctx)
if err != nil {
return nil, err
}
var result Differences
query, args, err := psql.Select("diff_type", "path").
From(diffResultsTableName).
Expand All @@ -148,7 +175,7 @@ func getDiffDifferences(tx db.Tx, limit int, after string) (Differences, error)
return result, nil
}

func (c *cataloger) diffFromChild(tx db.Tx, childID, parentID int64) error {
func (c *cataloger) diffFromChild(ctx context.Context, tx db.Tx, childID, parentID int64) error {
// read last merge commit numbers from commit table
// if it is the first child-to-parent commit, than those commit numbers are calculated as follows:
// the child is 0, as any change in the child was never merged to the parent.
Expand Down Expand Up @@ -202,8 +229,12 @@ func (c *cataloger) diffFromChild(tx db.Tx, childID, parentID int64) error {

childLineageValues := getLineageAsValues(childLineage, childID, MaxCommitID)
mainDiffFromChild := sqDiffFromChildV(parentID, childID, effectiveCommits.ParentEffectiveCommit, effectiveCommits.ChildEffectiveCommit, parentLineage, childLineageValues)
diffResultsTableName, err := diffResultsTableNameFromContext(ctx)
if err != nil {
return err
}
diffFromChildSQL, args, err := mainDiffFromChild.
Prefix("CREATE TEMP TABLE " + diffResultsTableName + " ON COMMIT DROP AS").
Prefix("CREATE UNLOGGED TABLE " + diffResultsTableName + " AS ").
PlaceholderFormat(sq.Dollar).
ToSql()
if err != nil {
Expand All @@ -215,7 +246,31 @@ func (c *cataloger) diffFromChild(tx db.Tx, childID, parentID int64) error {
return nil
}

func (c *cataloger) diffNonDirect(_ db.Tx, leftID, rightID int64) error {
// withDiffResultsContext generate diff results id used for temporary table name
func (c *cataloger) withDiffResultsContext(ctx context.Context) (context.Context, context.CancelFunc) {
id := strings.ReplaceAll(uuid.New().String(), "-", "")
return context.WithValue(ctx, contextDiffResultsKey, id), func() {
tableName := diffResultsTableNameFormat(id)
_, err := c.db.Exec("DROP TABLE IF EXISTS " + tableName)
if err != nil {
c.log.WithError(err).WithField("table_name", tableName).Warn("Failed to drop diff results table")
}
}
}

func diffResultsTableNameFromContext(ctx context.Context) (string, error) {
id, ok := ctx.Value(contextDiffResultsKey).(string)
if !ok {
return "", ErrMissingDiffResultsIDInContext
}
return diffResultsTableNameFormat(id), nil
}

func diffResultsTableNameFormat(id string) string {
return diffResultsTableNamePrefix + "_" + id
}

func (c *cataloger) diffNonDirect(_ context.Context, _ db.Tx, leftID, rightID int64) error {
c.log.WithFields(logging.Fields{
"left_id": leftID,
"right_id": rightID,
Expand Down
35 changes: 23 additions & 12 deletions catalog/cataloger_merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ func (c *cataloger) Merge(ctx context.Context, repository, leftBranch, rightBran
return nil, err
}

ctx, cancel := c.withDiffResultsContext(ctx)
defer cancel()

mergeResult := &MergeResult{}
_, err := c.db.Transact(func(tx db.Tx) (interface{}, error) {
leftID, err := getBranchID(tx, repository, leftBranch, LockTypeUpdate)
Expand All @@ -36,11 +39,11 @@ func (c *cataloger) Merge(ctx context.Context, repository, leftBranch, rightBran
return nil, fmt.Errorf("branch relation: %w", err)
}

err = c.doDiffByRelation(tx, relation, leftID, rightID)
err = c.doDiffByRelation(ctx, tx, relation, leftID, rightID)
if err != nil {
return nil, err
}
mergeResult.Summary, err = c.getDiffSummary(tx)
mergeResult.Summary, err = c.getDiffSummary(ctx, tx)
if err != nil {
return nil, err
}
Expand All @@ -66,7 +69,7 @@ func (c *cataloger) Merge(ctx context.Context, repository, leftBranch, rightBran
if message == "" {
message = formatMergeMessage(leftBranch, rightBranch)
}
commitID, err := c.doMergeByRelation(tx, relation, leftID, rightID, committer, message, metadata)
commitID, err := c.doMergeByRelation(ctx, tx, relation, leftID, rightID, committer, message, metadata)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -104,7 +107,7 @@ func formatMergeMessage(leftBranch string, rightBranch string) string {
return fmt.Sprintf("Merge '%s' into '%s'", leftBranch, rightBranch)
}

func (c *cataloger) doMergeByRelation(tx db.Tx, relation RelationType, leftID, rightID int64, committer string, msg string, metadata Metadata) (CommitID, error) {
func (c *cataloger) doMergeByRelation(ctx context.Context, tx db.Tx, relation RelationType, leftID, rightID int64, committer string, msg string, metadata Metadata) (CommitID, error) {
nextCommitID, err := getNextCommitID(tx)
if err != nil {
return 0, err
Expand All @@ -117,11 +120,11 @@ func (c *cataloger) doMergeByRelation(tx db.Tx, relation RelationType, leftID, r
}
switch relation {
case RelationTypeFromParent:
err = c.mergeFromParent(tx, previousMaxCommitID, nextCommitID, leftID, rightID, committer, msg, metadata)
err = c.mergeFromParent(ctx, tx, previousMaxCommitID, nextCommitID, leftID, rightID, committer, msg, metadata)
case RelationTypeFromChild:
err = c.mergeFromChild(tx, previousMaxCommitID, nextCommitID, leftID, rightID, committer, msg, metadata)
err = c.mergeFromChild(ctx, tx, previousMaxCommitID, nextCommitID, leftID, rightID, committer, msg, metadata)
case RelationTypeNotDirect:
err = c.mergeNonDirect(tx, previousMaxCommitID, nextCommitID, leftID, rightID, committer, msg, metadata)
err = c.mergeNonDirect(ctx, tx, previousMaxCommitID, nextCommitID, leftID, rightID, committer, msg, metadata)
default:
return 0, ErrUnsupportedRelation
}
Expand All @@ -131,8 +134,12 @@ func (c *cataloger) doMergeByRelation(tx db.Tx, relation RelationType, leftID, r
return nextCommitID, nil
}

func (c *cataloger) mergeFromParent(tx db.Tx, previousMaxCommitID, nextCommitID CommitID, parentID, childID int64, committer string, msg string, metadata Metadata) error {
_, err := tx.Exec(`UPDATE catalog_entries SET max_commit = $2
func (c *cataloger) mergeFromParent(ctx context.Context, tx db.Tx, previousMaxCommitID, nextCommitID CommitID, parentID, childID int64, committer, msg string, metadata Metadata) error {
diffResultsTableName, err := diffResultsTableNameFromContext(ctx)
if err != nil {
return err
}
_, err = tx.Exec(`UPDATE catalog_entries SET max_commit = $2
WHERE branch_id = $1 AND max_commit = catalog_max_commit_id() AND path in (SELECT path FROM `+diffResultsTableName+` WHERE diff_type IN ($3,$4))`,
childID, previousMaxCommitID, DifferenceTypeRemoved, DifferenceTypeChanged)
if err != nil {
Expand Down Expand Up @@ -177,9 +184,13 @@ func (c *cataloger) mergeFromParent(tx db.Tx, previousMaxCommitID, nextCommitID
return nil
}

func (c *cataloger) mergeFromChild(tx db.Tx, previousMaxCommitID, nextCommitID CommitID, childID int64, parentID int64, committer string, msg string, metadata Metadata) error {
func (c *cataloger) mergeFromChild(ctx context.Context, tx db.Tx, previousMaxCommitID, nextCommitID CommitID, childID int64, parentID int64, committer string, msg string, metadata Metadata) error {
// DifferenceTypeRemoved and DifferenceTypeChanged - set max_commit the our commit for committed entries in parent branch
_, err := tx.Exec(`UPDATE catalog_entries SET max_commit = $2
diffResultsTableName, err := diffResultsTableNameFromContext(ctx)
if err != nil {
return err
}
_, err = tx.Exec(`UPDATE catalog_entries SET max_commit = $2
WHERE branch_id = $1 AND max_commit = catalog_max_commit_id()
AND path in (SELECT path FROM `+diffResultsTableName+` WHERE diff_type IN ($3,$4))`,
parentID, previousMaxCommitID, DifferenceTypeRemoved, DifferenceTypeChanged)
Expand Down Expand Up @@ -215,7 +226,7 @@ func (c *cataloger) mergeFromChild(tx db.Tx, previousMaxCommitID, nextCommitID C
return err
}

func (c *cataloger) mergeNonDirect(_ sqlx.Execer, previousMaxCommitID, nextCommitID CommitID, leftID, rightID int64, committer string, msg string, _ Metadata) error {
func (c *cataloger) mergeNonDirect(_ context.Context, _ sqlx.Execer, previousMaxCommitID, nextCommitID CommitID, leftID, rightID int64, committer, msg string, _ Metadata) error {
c.log.WithFields(logging.Fields{
"commit_id": previousMaxCommitID,
"next_commit_id": nextCommitID,
Expand Down

0 comments on commit 0f4fb7f

Please sign in to comment.