Skip to content
This repository has been archived by the owner on Nov 16, 2023. It is now read-only.

move all merge code to merge.go #458

Merged
merged 3 commits into from
Dec 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 67 additions & 0 deletions merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,71 @@ import (
"io"
)

// MergeRowGroups constructs a row group which is a merged view of rowGroups. If
// rowGroups are sorted and the passed options include sorting, the merged row
// group will also be sorted.
//
// The function validates the input to ensure that the merge operation is
// possible, ensuring that the schemas match or can be converted to an
// optionally configured target schema passed as argument in the option list.
//
// The sorting columns of each row group are also consulted to determine whether
// the output can be represented. If sorting columns are configured on the merge
// they must be a prefix of sorting columns of all row groups being merged.
func MergeRowGroups(rowGroups []RowGroup, options ...RowGroupOption) (RowGroup, error) {
config, err := NewRowGroupConfig(options...)
if err != nil {
return nil, err
}

schema := config.Schema
if len(rowGroups) == 0 {
return newEmptyRowGroup(schema), nil
}
if schema == nil {
schema = rowGroups[0].Schema()

for _, rowGroup := range rowGroups[1:] {
if !nodesAreEqual(schema, rowGroup.Schema()) {
return nil, ErrRowGroupSchemaMismatch
}
}
}

mergedRowGroups := make([]RowGroup, len(rowGroups))
copy(mergedRowGroups, rowGroups)

for i, rowGroup := range mergedRowGroups {
if rowGroupSchema := rowGroup.Schema(); !nodesAreEqual(schema, rowGroupSchema) {
conv, err := Convert(schema, rowGroupSchema)
if err != nil {
return nil, fmt.Errorf("cannot merge row groups: %w", err)
}
mergedRowGroups[i] = ConvertRowGroup(rowGroup, conv)
}
}

m := &mergedRowGroup{sorting: config.Sorting.SortingColumns}
m.init(schema, mergedRowGroups)

if len(m.sorting) == 0 {
// When the row group has no ordering, use a simpler version of the
// merger which simply concatenates rows from each of the row groups.
// This is preferable because it makes the output deterministic, the
// heap merge may otherwise reorder rows across groups.
return &m.multiRowGroup, nil
}

for _, rowGroup := range m.rowGroups {
if !sortingColumnsHavePrefix(rowGroup.SortingColumns(), m.sorting) {
return nil, ErrRowGroupSortingColumnsMismatch
}
}

m.compare = compareRowsFuncOf(schema, m.sorting)
return m, nil
}

type mergedRowGroup struct {
multiRowGroup
sorting []SortingColumn
Expand Down Expand Up @@ -83,6 +148,8 @@ func (r *mergedRowGroupRows) Schema() *Schema {
return r.schema
}

// MergeRowReader constructs a RowReader which creates an ordered sequence of
// all the readers using the given compare function as the ordering predicate.
func MergeRowReaders(readers []RowReader, compare func(Row, Row) int) RowReader {
return &mergedRowReader{
compare: compare,
Expand Down
Loading