diff --git a/merge.go b/merge.go index 114fc1c8..451ae1d3 100644 --- a/merge.go +++ b/merge.go @@ -6,6 +6,71 @@ import ( "io" ) +// MergeRowGroups constructs a row group which is a merged view of rowGroups. If +// rowGroups are sorted and the passed options include sorting, the merged row +// group will also be sorted. +// +// The function validates the input to ensure that the merge operation is +// possible, ensuring that the schemas match or can be converted to an +// optionally configured target schema passed as argument in the option list. +// +// The sorting columns of each row group are also consulted to determine whether +// the output can be represented. If sorting columns are configured on the merge +// they must be a prefix of sorting columns of all row groups being merged. +func MergeRowGroups(rowGroups []RowGroup, options ...RowGroupOption) (RowGroup, error) { + config, err := NewRowGroupConfig(options...) + if err != nil { + return nil, err + } + + schema := config.Schema + if len(rowGroups) == 0 { + return newEmptyRowGroup(schema), nil + } + if schema == nil { + schema = rowGroups[0].Schema() + + for _, rowGroup := range rowGroups[1:] { + if !nodesAreEqual(schema, rowGroup.Schema()) { + return nil, ErrRowGroupSchemaMismatch + } + } + } + + mergedRowGroups := make([]RowGroup, len(rowGroups)) + copy(mergedRowGroups, rowGroups) + + for i, rowGroup := range mergedRowGroups { + if rowGroupSchema := rowGroup.Schema(); !nodesAreEqual(schema, rowGroupSchema) { + conv, err := Convert(schema, rowGroupSchema) + if err != nil { + return nil, fmt.Errorf("cannot merge row groups: %w", err) + } + mergedRowGroups[i] = ConvertRowGroup(rowGroup, conv) + } + } + + m := &mergedRowGroup{sorting: config.Sorting.SortingColumns} + m.init(schema, mergedRowGroups) + + if len(m.sorting) == 0 { + // When the row group has no ordering, use a simpler version of the + // merger which simply concatenates rows from each of the row groups. + // This is preferable because it makes the output deterministic, the + // heap merge may otherwise reorder rows across groups. + return &m.multiRowGroup, nil + } + + for _, rowGroup := range m.rowGroups { + if !sortingColumnsHavePrefix(rowGroup.SortingColumns(), m.sorting) { + return nil, ErrRowGroupSortingColumnsMismatch + } + } + + m.compare = compareRowsFuncOf(schema, m.sorting) + return m, nil +} + type mergedRowGroup struct { multiRowGroup sorting []SortingColumn @@ -83,6 +148,8 @@ func (r *mergedRowGroupRows) Schema() *Schema { return r.schema } +// MergeRowReader constructs a RowReader which creates an ordered sequence of +// all the readers using the given compare function as the ordering predicate. func MergeRowReaders(readers []RowReader, compare func(Row, Row) int) RowReader { return &mergedRowReader{ compare: compare, diff --git a/merge_test.go b/merge_test.go index ab8cbe5a..16c21ea3 100644 --- a/merge_test.go +++ b/merge_test.go @@ -36,6 +36,342 @@ func (r *wrappedRows) Close() error { return r.Rows.Close() } +func TestMergeRowGroups(t *testing.T) { + tests := []struct { + scenario string + options []parquet.RowGroupOption + input []parquet.RowGroup + output parquet.RowGroup + }{ + { + scenario: "no row groups", + options: []parquet.RowGroupOption{ + parquet.SchemaOf(Person{}), + }, + output: sortedRowGroup( + []parquet.RowGroupOption{ + parquet.SchemaOf(Person{}), + }, + ), + }, + + { + scenario: "a single row group", + input: []parquet.RowGroup{ + sortedRowGroup(nil, + Person{FirstName: "some", LastName: "one", Age: 30}, + Person{FirstName: "some", LastName: "one else", Age: 31}, + Person{FirstName: "and", LastName: "you", Age: 32}, + ), + }, + output: sortedRowGroup(nil, + Person{FirstName: "some", LastName: "one", Age: 30}, + Person{FirstName: "some", LastName: "one else", Age: 31}, + Person{FirstName: "and", LastName: "you", Age: 32}, + ), + }, + + { + scenario: "two row groups without ordering", + input: []parquet.RowGroup{ + sortedRowGroup(nil, Person{FirstName: "some", LastName: "one", Age: 30}), + sortedRowGroup(nil, Person{FirstName: "some", LastName: "one else", Age: 31}), + }, + output: sortedRowGroup(nil, + Person{FirstName: "some", LastName: "one", Age: 30}, + Person{FirstName: "some", LastName: "one else", Age: 31}, + ), + }, + + { + scenario: "three row groups without ordering", + input: []parquet.RowGroup{ + sortedRowGroup(nil, Person{FirstName: "some", LastName: "one", Age: 30}), + sortedRowGroup(nil, Person{FirstName: "some", LastName: "one else", Age: 31}), + sortedRowGroup(nil, Person{FirstName: "question", LastName: "answer", Age: 42}), + }, + output: sortedRowGroup(nil, + Person{FirstName: "some", LastName: "one", Age: 30}, + Person{FirstName: "some", LastName: "one else", Age: 31}, + Person{FirstName: "question", LastName: "answer", Age: 42}, + ), + }, + + { + scenario: "row groups sorted by ascending last name", + options: []parquet.RowGroupOption{ + parquet.SortingRowGroupConfig( + parquet.SortingColumns( + parquet.Ascending("LastName"), + ), + ), + }, + input: []parquet.RowGroup{ + sortedRowGroup( + []parquet.RowGroupOption{ + parquet.SortingRowGroupConfig( + parquet.SortingColumns( + parquet.Ascending("LastName"), + ), + ), + }, + Person{FirstName: "Han", LastName: "Solo"}, + Person{FirstName: "Luke", LastName: "Skywalker"}, + ), + sortedRowGroup( + []parquet.RowGroupOption{ + parquet.SortingRowGroupConfig( + parquet.SortingColumns( + parquet.Ascending("LastName"), + ), + ), + }, + Person{FirstName: "Obiwan", LastName: "Kenobi"}, + ), + }, + output: sortedRowGroup(nil, + Person{FirstName: "Obiwan", LastName: "Kenobi"}, + Person{FirstName: "Luke", LastName: "Skywalker"}, + Person{FirstName: "Han", LastName: "Solo"}, + ), + }, + + { + scenario: "row groups sorted by descending last name", + options: []parquet.RowGroupOption{ + parquet.SortingRowGroupConfig( + parquet.SortingColumns( + parquet.Descending("LastName"), + ), + ), + }, + input: []parquet.RowGroup{ + sortedRowGroup( + []parquet.RowGroupOption{ + parquet.SortingRowGroupConfig( + parquet.SortingColumns( + parquet.Descending("LastName"), + ), + ), + }, + Person{FirstName: "Han", LastName: "Solo"}, + Person{FirstName: "Luke", LastName: "Skywalker"}, + ), + sortedRowGroup( + []parquet.RowGroupOption{ + parquet.SortingRowGroupConfig( + parquet.SortingColumns( + parquet.Descending("LastName"), + ), + ), + }, + Person{FirstName: "Obiwan", LastName: "Kenobi"}, + ), + }, + output: sortedRowGroup(nil, + Person{FirstName: "Han", LastName: "Solo"}, + Person{FirstName: "Luke", LastName: "Skywalker"}, + Person{FirstName: "Obiwan", LastName: "Kenobi"}, + ), + }, + + { + scenario: "row groups sorted by ascending last and first name", + options: []parquet.RowGroupOption{ + parquet.SortingRowGroupConfig( + parquet.SortingColumns( + parquet.Ascending("LastName"), + parquet.Ascending("FirstName"), + ), + ), + }, + input: []parquet.RowGroup{ + sortedRowGroup( + []parquet.RowGroupOption{ + parquet.SortingRowGroupConfig( + parquet.SortingColumns( + parquet.Ascending("LastName"), + parquet.Ascending("FirstName"), + ), + ), + }, + Person{FirstName: "Luke", LastName: "Skywalker"}, + Person{FirstName: "Han", LastName: "Solo"}, + ), + sortedRowGroup( + []parquet.RowGroupOption{ + parquet.SortingRowGroupConfig( + parquet.SortingColumns( + parquet.Ascending("LastName"), + parquet.Ascending("FirstName"), + ), + ), + }, + Person{FirstName: "Obiwan", LastName: "Kenobi"}, + Person{FirstName: "Anakin", LastName: "Skywalker"}, + ), + }, + output: sortedRowGroup(nil, + Person{FirstName: "Obiwan", LastName: "Kenobi"}, + Person{FirstName: "Anakin", LastName: "Skywalker"}, + Person{FirstName: "Luke", LastName: "Skywalker"}, + Person{FirstName: "Han", LastName: "Solo"}, + ), + }, + + { + scenario: "row groups with conversion to a different schema", + options: []parquet.RowGroupOption{ + parquet.SchemaOf(LastNameOnly{}), + parquet.SortingRowGroupConfig( + parquet.SortingColumns( + parquet.Ascending("LastName"), + ), + ), + }, + input: []parquet.RowGroup{ + sortedRowGroup( + []parquet.RowGroupOption{ + parquet.SortingRowGroupConfig( + parquet.SortingColumns( + parquet.Ascending("LastName"), + ), + ), + }, + Person{FirstName: "Han", LastName: "Solo"}, + Person{FirstName: "Luke", LastName: "Skywalker"}, + ), + sortedRowGroup( + []parquet.RowGroupOption{ + parquet.SortingRowGroupConfig( + parquet.SortingColumns( + parquet.Ascending("LastName"), + ), + ), + }, + Person{FirstName: "Obiwan", LastName: "Kenobi"}, + Person{FirstName: "Anakin", LastName: "Skywalker"}, + ), + }, + output: sortedRowGroup( + []parquet.RowGroupOption{ + parquet.SortingRowGroupConfig( + parquet.SortingColumns( + parquet.Ascending("LastName"), + ), + ), + }, + LastNameOnly{LastName: "Solo"}, + LastNameOnly{LastName: "Skywalker"}, + LastNameOnly{LastName: "Skywalker"}, + LastNameOnly{LastName: "Kenobi"}, + ), + }, + } + + for _, adapter := range []struct { + scenario string + function func(parquet.RowGroup) parquet.RowGroup + }{ + {scenario: "buffer", function: selfRowGroup}, + {scenario: "file", function: fileRowGroup}, + } { + t.Run(adapter.scenario, func(t *testing.T) { + for _, test := range tests { + t.Run(test.scenario, func(t *testing.T) { + input := make([]parquet.RowGroup, len(test.input)) + for i := range test.input { + input[i] = adapter.function(test.input[i]) + } + + merged, err := parquet.MergeRowGroups(test.input, test.options...) + if err != nil { + t.Fatal(err) + } + if merged.NumRows() != test.output.NumRows() { + t.Fatalf("the number of rows mismatch: want=%d got=%d", merged.NumRows(), test.output.NumRows()) + } + if merged.Schema() != test.output.Schema() { + t.Fatalf("the row group schemas mismatch:\n%v\n%v", test.output.Schema(), merged.Schema()) + } + + options := []parquet.RowGroupOption{parquet.SchemaOf(Person{})} + options = append(options, test.options...) + // We test two views of the resulting row group: the one originally + // returned by MergeRowGroups, and one where the merged row group + // has been copied into a new buffer. The intent is to exercise both + // the row-by-row read as well as optimized code paths when CopyRows + // bypasses the ReadRow/WriteRow calls and the row group is written + // directly to the buffer by calling WriteRowsTo/WriteRowGroup. + mergedCopy := parquet.NewBuffer(options...) + + totalRows := test.output.NumRows() + numRows, err := copyRowsAndClose(mergedCopy, merged.Rows()) + if err != nil { + t.Fatal(err) + } + if numRows != totalRows { + t.Fatalf("wrong number of rows copied: want=%d got=%d", totalRows, numRows) + } + + for _, merge := range []struct { + scenario string + rowGroup parquet.RowGroup + }{ + {scenario: "self", rowGroup: merged}, + {scenario: "copy", rowGroup: mergedCopy}, + } { + t.Run(merge.scenario, func(t *testing.T) { + var expectedRows = test.output.Rows() + var mergedRows = merge.rowGroup.Rows() + var row1 = make([]parquet.Row, 1) + var row2 = make([]parquet.Row, 1) + var numRows int64 + + defer expectedRows.Close() + defer mergedRows.Close() + + for { + _, err1 := expectedRows.ReadRows(row1) + n, err2 := mergedRows.ReadRows(row2) + + if err1 != err2 { + // ReadRows may or may not return io.EOF + // when it reads the last row, so we test + // that the reference RowReader has also + // reached the end. + if err1 == nil && err2 == io.EOF { + _, err1 = expectedRows.ReadRows(row1[:0]) + } + if err1 != io.EOF { + t.Fatalf("errors mismatched while comparing row %d/%d: want=%v got=%v", numRows, totalRows, err1, err2) + } + } + + if n != 0 { + if !row1[0].Equal(row2[0]) { + t.Errorf("row at index %d/%d mismatch: want=%+v got=%+v", numRows, totalRows, row1[0], row2[0]) + } + numRows++ + } + + if err1 != nil { + break + } + } + + if numRows != totalRows { + t.Errorf("expected to read %d rows but %d were found", totalRows, numRows) + } + }) + } + + }) + } + }) + } +} + func TestMergeRowGroupsCursorsAreClosed(t *testing.T) { type model struct { A int diff --git a/row_builder_test.go b/row_builder_test.go index e4dfe614..526c5f94 100644 --- a/row_builder_test.go +++ b/row_builder_test.go @@ -1,11 +1,34 @@ package parquet_test import ( + "fmt" "testing" "github.com/segmentio/parquet-go" ) +func ExampleRowBuilder() { + builder := parquet.NewRowBuilder(parquet.Group{ + "birth_date": parquet.Optional(parquet.Date()), + "first_name": parquet.String(), + "last_name": parquet.String(), + }) + + builder.Add(1, parquet.ByteArrayValue([]byte("Luke"))) + builder.Add(2, parquet.ByteArrayValue([]byte("Skywalker"))) + + row := builder.Row() + row.Range(func(columnIndex int, columnValues []parquet.Value) bool { + fmt.Printf("%+v\n", columnValues[0]) + return true + }) + + // Output: + // C:0 D:0 R:0 V: + // C:1 D:0 R:0 V:Luke + // C:2 D:0 R:0 V:Skywalker +} + func TestRowBuilder(t *testing.T) { type ( operation = func(*parquet.RowBuilder) @@ -247,3 +270,17 @@ func TestRowBuilder(t *testing.T) { }) } } + +func BenchmarkRowBuilderAdd(b *testing.B) { + builder := parquet.NewRowBuilder(parquet.Group{ + "ids": parquet.Repeated(parquet.Int(64)), + }) + + for i := 0; i < b.N; i++ { + builder.Add(0, parquet.Int64Value(int64(i))) + + if (i % 128) == 0 { + builder.Reset() // so don't run out of memory ;) + } + } +} diff --git a/row_group.go b/row_group.go index c95afb69..3cda08cb 100644 --- a/row_group.go +++ b/row_group.go @@ -152,71 +152,6 @@ func sortingColumnsAreEqual(s1, s2 SortingColumn) bool { return path1.equal(path2) && s1.Descending() == s2.Descending() && s1.NullsFirst() == s2.NullsFirst() } -// MergeRowGroups constructs a row group which is a merged view of rowGroups. If -// rowGroups are sorted and the passed options include sorting, the merged row -// group will also be sorted. -// -// The function validates the input to ensure that the merge operation is -// possible, ensuring that the schemas match or can be converted to an -// optionally configured target schema passed as argument in the option list. -// -// The sorting columns of each row group are also consulted to determine whether -// the output can be represented. If sorting columns are configured on the merge -// they must be a prefix of sorting columns of all row groups being merged. -func MergeRowGroups(rowGroups []RowGroup, options ...RowGroupOption) (RowGroup, error) { - config, err := NewRowGroupConfig(options...) - if err != nil { - return nil, err - } - - schema := config.Schema - if len(rowGroups) == 0 { - return newEmptyRowGroup(schema), nil - } - if schema == nil { - schema = rowGroups[0].Schema() - - for _, rowGroup := range rowGroups[1:] { - if !nodesAreEqual(schema, rowGroup.Schema()) { - return nil, ErrRowGroupSchemaMismatch - } - } - } - - mergedRowGroups := make([]RowGroup, len(rowGroups)) - copy(mergedRowGroups, rowGroups) - - for i, rowGroup := range mergedRowGroups { - if rowGroupSchema := rowGroup.Schema(); !nodesAreEqual(schema, rowGroupSchema) { - conv, err := Convert(schema, rowGroupSchema) - if err != nil { - return nil, fmt.Errorf("cannot merge row groups: %w", err) - } - mergedRowGroups[i] = ConvertRowGroup(rowGroup, conv) - } - } - - m := &mergedRowGroup{sorting: config.Sorting.SortingColumns} - m.init(schema, mergedRowGroups) - - if len(m.sorting) == 0 { - // When the row group has no ordering, use a simpler version of the - // merger which simply concatenates rows from each of the row groups. - // This is preferable because it makes the output deterministic, the - // heap merge may otherwise reorder rows across groups. - return &m.multiRowGroup, nil - } - - for _, rowGroup := range m.rowGroups { - if !sortingColumnsHavePrefix(rowGroup.SortingColumns(), m.sorting) { - return nil, ErrRowGroupSortingColumnsMismatch - } - } - - m.compare = compareRowsFuncOf(schema, m.sorting) - return m, nil -} - type rowGroup struct { schema *Schema numRows int64 diff --git a/row_group_test.go b/row_group_test.go index cf3940e2..c52b3f5f 100644 --- a/row_group_test.go +++ b/row_group_test.go @@ -123,342 +123,6 @@ func fileRowGroup(rowGroup parquet.RowGroup) parquet.RowGroup { return f.RowGroups()[0] } -func TestMergeRowGroups(t *testing.T) { - tests := []struct { - scenario string - options []parquet.RowGroupOption - input []parquet.RowGroup - output parquet.RowGroup - }{ - { - scenario: "no row groups", - options: []parquet.RowGroupOption{ - parquet.SchemaOf(Person{}), - }, - output: sortedRowGroup( - []parquet.RowGroupOption{ - parquet.SchemaOf(Person{}), - }, - ), - }, - - { - scenario: "a single row group", - input: []parquet.RowGroup{ - sortedRowGroup(nil, - Person{FirstName: "some", LastName: "one", Age: 30}, - Person{FirstName: "some", LastName: "one else", Age: 31}, - Person{FirstName: "and", LastName: "you", Age: 32}, - ), - }, - output: sortedRowGroup(nil, - Person{FirstName: "some", LastName: "one", Age: 30}, - Person{FirstName: "some", LastName: "one else", Age: 31}, - Person{FirstName: "and", LastName: "you", Age: 32}, - ), - }, - - { - scenario: "two row groups without ordering", - input: []parquet.RowGroup{ - sortedRowGroup(nil, Person{FirstName: "some", LastName: "one", Age: 30}), - sortedRowGroup(nil, Person{FirstName: "some", LastName: "one else", Age: 31}), - }, - output: sortedRowGroup(nil, - Person{FirstName: "some", LastName: "one", Age: 30}, - Person{FirstName: "some", LastName: "one else", Age: 31}, - ), - }, - - { - scenario: "three row groups without ordering", - input: []parquet.RowGroup{ - sortedRowGroup(nil, Person{FirstName: "some", LastName: "one", Age: 30}), - sortedRowGroup(nil, Person{FirstName: "some", LastName: "one else", Age: 31}), - sortedRowGroup(nil, Person{FirstName: "question", LastName: "answer", Age: 42}), - }, - output: sortedRowGroup(nil, - Person{FirstName: "some", LastName: "one", Age: 30}, - Person{FirstName: "some", LastName: "one else", Age: 31}, - Person{FirstName: "question", LastName: "answer", Age: 42}, - ), - }, - - { - scenario: "row groups sorted by ascending last name", - options: []parquet.RowGroupOption{ - parquet.SortingRowGroupConfig( - parquet.SortingColumns( - parquet.Ascending("LastName"), - ), - ), - }, - input: []parquet.RowGroup{ - sortedRowGroup( - []parquet.RowGroupOption{ - parquet.SortingRowGroupConfig( - parquet.SortingColumns( - parquet.Ascending("LastName"), - ), - ), - }, - Person{FirstName: "Han", LastName: "Solo"}, - Person{FirstName: "Luke", LastName: "Skywalker"}, - ), - sortedRowGroup( - []parquet.RowGroupOption{ - parquet.SortingRowGroupConfig( - parquet.SortingColumns( - parquet.Ascending("LastName"), - ), - ), - }, - Person{FirstName: "Obiwan", LastName: "Kenobi"}, - ), - }, - output: sortedRowGroup(nil, - Person{FirstName: "Obiwan", LastName: "Kenobi"}, - Person{FirstName: "Luke", LastName: "Skywalker"}, - Person{FirstName: "Han", LastName: "Solo"}, - ), - }, - - { - scenario: "row groups sorted by descending last name", - options: []parquet.RowGroupOption{ - parquet.SortingRowGroupConfig( - parquet.SortingColumns( - parquet.Descending("LastName"), - ), - ), - }, - input: []parquet.RowGroup{ - sortedRowGroup( - []parquet.RowGroupOption{ - parquet.SortingRowGroupConfig( - parquet.SortingColumns( - parquet.Descending("LastName"), - ), - ), - }, - Person{FirstName: "Han", LastName: "Solo"}, - Person{FirstName: "Luke", LastName: "Skywalker"}, - ), - sortedRowGroup( - []parquet.RowGroupOption{ - parquet.SortingRowGroupConfig( - parquet.SortingColumns( - parquet.Descending("LastName"), - ), - ), - }, - Person{FirstName: "Obiwan", LastName: "Kenobi"}, - ), - }, - output: sortedRowGroup(nil, - Person{FirstName: "Han", LastName: "Solo"}, - Person{FirstName: "Luke", LastName: "Skywalker"}, - Person{FirstName: "Obiwan", LastName: "Kenobi"}, - ), - }, - - { - scenario: "row groups sorted by ascending last and first name", - options: []parquet.RowGroupOption{ - parquet.SortingRowGroupConfig( - parquet.SortingColumns( - parquet.Ascending("LastName"), - parquet.Ascending("FirstName"), - ), - ), - }, - input: []parquet.RowGroup{ - sortedRowGroup( - []parquet.RowGroupOption{ - parquet.SortingRowGroupConfig( - parquet.SortingColumns( - parquet.Ascending("LastName"), - parquet.Ascending("FirstName"), - ), - ), - }, - Person{FirstName: "Luke", LastName: "Skywalker"}, - Person{FirstName: "Han", LastName: "Solo"}, - ), - sortedRowGroup( - []parquet.RowGroupOption{ - parquet.SortingRowGroupConfig( - parquet.SortingColumns( - parquet.Ascending("LastName"), - parquet.Ascending("FirstName"), - ), - ), - }, - Person{FirstName: "Obiwan", LastName: "Kenobi"}, - Person{FirstName: "Anakin", LastName: "Skywalker"}, - ), - }, - output: sortedRowGroup(nil, - Person{FirstName: "Obiwan", LastName: "Kenobi"}, - Person{FirstName: "Anakin", LastName: "Skywalker"}, - Person{FirstName: "Luke", LastName: "Skywalker"}, - Person{FirstName: "Han", LastName: "Solo"}, - ), - }, - - { - scenario: "row groups with conversion to a different schema", - options: []parquet.RowGroupOption{ - parquet.SchemaOf(LastNameOnly{}), - parquet.SortingRowGroupConfig( - parquet.SortingColumns( - parquet.Ascending("LastName"), - ), - ), - }, - input: []parquet.RowGroup{ - sortedRowGroup( - []parquet.RowGroupOption{ - parquet.SortingRowGroupConfig( - parquet.SortingColumns( - parquet.Ascending("LastName"), - ), - ), - }, - Person{FirstName: "Han", LastName: "Solo"}, - Person{FirstName: "Luke", LastName: "Skywalker"}, - ), - sortedRowGroup( - []parquet.RowGroupOption{ - parquet.SortingRowGroupConfig( - parquet.SortingColumns( - parquet.Ascending("LastName"), - ), - ), - }, - Person{FirstName: "Obiwan", LastName: "Kenobi"}, - Person{FirstName: "Anakin", LastName: "Skywalker"}, - ), - }, - output: sortedRowGroup( - []parquet.RowGroupOption{ - parquet.SortingRowGroupConfig( - parquet.SortingColumns( - parquet.Ascending("LastName"), - ), - ), - }, - LastNameOnly{LastName: "Solo"}, - LastNameOnly{LastName: "Skywalker"}, - LastNameOnly{LastName: "Skywalker"}, - LastNameOnly{LastName: "Kenobi"}, - ), - }, - } - - for _, adapter := range []struct { - scenario string - function func(parquet.RowGroup) parquet.RowGroup - }{ - {scenario: "buffer", function: selfRowGroup}, - {scenario: "file", function: fileRowGroup}, - } { - t.Run(adapter.scenario, func(t *testing.T) { - for _, test := range tests { - t.Run(test.scenario, func(t *testing.T) { - input := make([]parquet.RowGroup, len(test.input)) - for i := range test.input { - input[i] = adapter.function(test.input[i]) - } - - merged, err := parquet.MergeRowGroups(test.input, test.options...) - if err != nil { - t.Fatal(err) - } - if merged.NumRows() != test.output.NumRows() { - t.Fatalf("the number of rows mismatch: want=%d got=%d", merged.NumRows(), test.output.NumRows()) - } - if merged.Schema() != test.output.Schema() { - t.Fatalf("the row group schemas mismatch:\n%v\n%v", test.output.Schema(), merged.Schema()) - } - - options := []parquet.RowGroupOption{parquet.SchemaOf(Person{})} - options = append(options, test.options...) - // We test two views of the resulting row group: the one originally - // returned by MergeRowGroups, and one where the merged row group - // has been copied into a new buffer. The intent is to exercise both - // the row-by-row read as well as optimized code paths when CopyRows - // bypasses the ReadRow/WriteRow calls and the row group is written - // directly to the buffer by calling WriteRowsTo/WriteRowGroup. - mergedCopy := parquet.NewBuffer(options...) - - totalRows := test.output.NumRows() - numRows, err := copyRowsAndClose(mergedCopy, merged.Rows()) - if err != nil { - t.Fatal(err) - } - if numRows != totalRows { - t.Fatalf("wrong number of rows copied: want=%d got=%d", totalRows, numRows) - } - - for _, merge := range []struct { - scenario string - rowGroup parquet.RowGroup - }{ - {scenario: "self", rowGroup: merged}, - {scenario: "copy", rowGroup: mergedCopy}, - } { - t.Run(merge.scenario, func(t *testing.T) { - var expectedRows = test.output.Rows() - var mergedRows = merge.rowGroup.Rows() - var row1 = make([]parquet.Row, 1) - var row2 = make([]parquet.Row, 1) - var numRows int64 - - defer expectedRows.Close() - defer mergedRows.Close() - - for { - _, err1 := expectedRows.ReadRows(row1) - n, err2 := mergedRows.ReadRows(row2) - - if err1 != err2 { - // ReadRows may or may not return io.EOF - // when it reads the last row, so we test - // that the reference RowReader has also - // reached the end. - if err1 == nil && err2 == io.EOF { - _, err1 = expectedRows.ReadRows(row1[:0]) - } - if err1 != io.EOF { - t.Fatalf("errors mismatched while comparing row %d/%d: want=%v got=%v", numRows, totalRows, err1, err2) - } - } - - if n != 0 { - if !row1[0].Equal(row2[0]) { - t.Errorf("row at index %d/%d mismatch: want=%+v got=%+v", numRows, totalRows, row1[0], row2[0]) - } - numRows++ - } - - if err1 != nil { - break - } - } - - if numRows != totalRows { - t.Errorf("expected to read %d rows but %d were found", totalRows, numRows) - } - }) - } - - }) - } - }) - } -} - func TestWriteRowGroupClosesRows(t *testing.T) { var rows []*wrappedRows rg := wrappedRowGroup{