Skip to content

Commit

Permalink
Diff working in a separate go routine from merge. Prallel diff and merge
Browse files Browse the repository at this point in the history
  • Loading branch information
tzahij committed Oct 31, 2020
1 parent 063ce17 commit 7a78820
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 31 deletions.
88 changes: 57 additions & 31 deletions catalog/cataloger_merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,14 @@ import (
)

const (
MergeBatchSize = 128
MergeBatchSize = 10
MergeBatchChanBuffer = 128
)

type mergeBatchRecords []*diffResultRecord
type mergeBatchRecords struct {
err error
differences []*diffResultRecord
}

// Merge perform diff between two branches (left and right), apply changes on right branch and commit
// It uses the cataloger diff internal API to produce a temporary table that we delete at the end of a successful merge
Expand All @@ -32,7 +36,6 @@ func (c *cataloger) Merge(ctx context.Context, repository, leftBranch, rightBran
}

mergeResult := &MergeResult{}
summary := make(map[DifferenceType]int)
_, err := c.db.Transact(func(tx db.Tx) (interface{}, error) {
leftID, err := getBranchID(tx, repository, leftBranch, LockTypeUpdate)
if err != nil {
Expand Down Expand Up @@ -64,46 +67,36 @@ func (c *cataloger) Merge(ctx context.Context, repository, leftBranch, rightBran
case RelationTypeNotDirect:
return nil, fmt.Errorf("merge suported only between branches that are parent-child or child-parent: %w", ErrOperationNotPermitted)
}
scanner, err := NewDiffScanner(tx, params, relation)
if err != nil {
return nil, err
}
nextCommitID, err := getNextCommitID(tx)
if err != nil {
return nil, err
}
// do the merge based on the relation
previousMaxCommitID, err := getLastCommitIDByBranchID(tx, rightID)
if err != nil {
return nil, err
}

differences := make(mergeBatchRecords, 0, MergeBatchSize)
mergeBatchChan := make(chan mergeBatchRecords, MergeBatchChanBuffer)
var workerExitFlag bool
go c.diffWorker(params, relation, mergeBatchChan, &workerExitFlag)
summary := make(map[DifferenceType]int)
var rowsCounter int
for scanner.Next() {
v := scanner.Value()
summary[v.DiffType] = summary[v.DiffType] + 1
rowsCounter++
if v.DiffType == DifferenceTypeConflict {
return nil, ErrConflictFound
for buf := range mergeBatchChan {
if buf.err != nil {
return nil, buf.err
}
for _, d := range buf.differences {
summary[d.DiffType] = summary[d.DiffType] + 1
rowsCounter++
}
differences = append(differences, v)
if len(differences) >= MergeBatchSize {
err = applyDiffChangesToRightBranch(tx, differences, previousMaxCommitID, nextCommitID, rightID, relation)
if err != nil {
return nil, err
err = applyDiffChangesToRightBranch(tx, buf, previousMaxCommitID, nextCommitID, rightID, relation)
if err != nil {
workerExitFlag = true
for ok := true; ok; { // drain the channel, till the worker closes it
_, ok = <-mergeBatchChan
}
differences = differences[:0]
return nil, err
}
}
err = scanner.Error()
if err != nil {
return nil, err
}
err = applyDiffChangesToRightBranch(tx, differences, previousMaxCommitID, nextCommitID, rightID, relation)
if err != nil {
return nil, err
}
if message == "" {
message = fmt.Sprintf("Merge '%s' into '%s'", leftBranch, rightBranch)
}
Expand Down Expand Up @@ -136,6 +129,39 @@ func (c *cataloger) Merge(ctx context.Context, repository, leftBranch, rightBran
return mergeResult, err
}

func (c *cataloger) diffWorker(params doDiffParams, relation RelationType, mergeBatchChan chan mergeBatchRecords, exitFlag *bool) {
_, _ = c.db.Transact(func(tx db.Tx) (interface{}, error) {
defer close(mergeBatchChan)
mergeBatch := mergeBatchRecords{differences: make([]*diffResultRecord, 0, MergeBatchSize)}
scanner, err := NewDiffScanner(tx, params, relation)
if err != nil {
mergeBatch.err = err
mergeBatchChan <- mergeBatch
return nil, nil
}
for scanner.Next() {
if *exitFlag {
return nil, nil
}
v := scanner.Value()
mergeBatch.differences = append(mergeBatch.differences, v)
if v.DiffType == DifferenceTypeConflict {
mergeBatch.err = ErrConflictFound
mergeBatchChan <- mergeBatch
return nil, nil
}
if len(mergeBatch.differences) >= MergeBatchSize {
mergeBatchChan <- mergeBatch
//mergeBatch.differences = mergeBatch.differences[:0]
mergeBatch.differences = make([]*diffResultRecord, 0, MergeBatchSize)
}
}
mergeBatch.err = scanner.Error()
mergeBatchChan <- mergeBatch
return nil, nil
}, db.ReadOnly())
}

// hasCommitDifferences - Checks if the current commit id of target or source branch advanced since last merge
func hasCommitDifferences(tx db.Tx, leftID, rightID int64) (bool, error) {
var hasCommitDifferences bool
Expand Down Expand Up @@ -165,7 +191,7 @@ func applyDiffChangesToRightBranch(tx db.Tx, mergeBatch mergeBatchRecords, previ
paths := make([]string, 0, MergeBatchSize)
ctidArray := make([]string, 0, MergeBatchSize)
var tombstonePaths []string
for _, diffRec := range mergeBatch {
for _, diffRec := range mergeBatch.differences {
if diffRec.DiffType == DifferenceTypeRemoved || diffRec.DiffType == DifferenceTypeChanged {
paths = append(paths, diffRec.Entry.Path)
}
Expand Down
1 change: 1 addition & 0 deletions catalog/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,5 @@ var (
ErrMissingDiffResultsIDInContext = errors.New("missing diff results id in context")
ErrInvalidValue = errors.New("invalid value")
ErrNonDirectNotSupported = errors.New("non direct diff not supported")
ErrMergeWorkerRollback = errors.New("merge worker exit because of diff error")
)

0 comments on commit 7a78820

Please sign in to comment.