Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-43994: [C++][Parquet] Fix schema conversion from two-level encoding nested list #43995

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 56 additions & 0 deletions cpp/src/parquet/arrow/arrow_reader_writer_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4093,6 +4093,62 @@ TEST(TestArrowReaderAdHoc, OldDataPageV2) {
TryReadDataFile(path);
}

TEST(TestArrowReaderAdHoc, LegacyTwoLevelList) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This feature is supported by parquet-java via a writer flag parquet.avro.write-old-list-structure: https://github.com/apache/parquet-java/blob/08a4e7e6279f8c3b8558bd294fee4489a96d0db1/parquet-avro/src/main/java/org/apache/parquet/avro/AvroWriteSupport.java#L69 so I don't think we need to add a test file to parquet-testing repo.

Instead, I added a roundtrip test to create the file by C++ Parquet writer and then read it via the C++ Arrow Parquet reader. Does it look good to you? @pitrou @mapleFU

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, we can't invoke parquet-java from the C++ test suite, so that's a good reason for a file to reside in parquet-testing (also for other implementations). Is there any problem with that?

// Create schema with a nested list of two-level encoding:
constexpr std::string_view kExpectedSchema =
"required group field_id=-1 schema {\n"
" optional group field_id=-1 nested_list (List) {\n"
" repeated group field_id=-1 array (List) {\n"
" repeated int32 field_id=-1 array;\n"
" }\n"
" }\n"
"}\n";
auto inner_element = PrimitiveNode::Make("array", Repetition::REPEATED, Type::INT32);
auto outer_element = GroupNode::Make("array", Repetition::REPEATED, {inner_element},
ConvertedType::LIST);
auto nested_list = GroupNode::Make("nested_list", Repetition::OPTIONAL, {outer_element},
ConvertedType::LIST);
auto schema_node = GroupNode::Make("schema", Repetition::REQUIRED, {nested_list});

// Create a Parquet writer to write values of nested list
auto sink = CreateOutputStream();
auto file_writer = ParquetFileWriter::Open(
sink, std::dynamic_pointer_cast<schema::GroupNode>(schema_node));
auto row_group_writer = file_writer->AppendRowGroup();
auto int_writer = dynamic_cast<Int32Writer*>(row_group_writer->NextColumn());
ASSERT_TRUE(int_writer != nullptr);

// Directly write a single row of nested list: [[1, 2],[3, 4]]
constexpr int64_t kNumValues = 4;
std::array<int16_t, kNumValues> rep_levels = {0, 2, 1, 2};
std::array<int16_t, kNumValues> def_levels = {3, 3, 3, 3};
std::array<int32_t, kNumValues> values = {1, 2, 3, 4};
int_writer->WriteBatch(kNumValues, def_levels.data(), rep_levels.data(), values.data());
file_writer->Close();

// Read schema and verify it applies two-level encoding of list type
ASSERT_OK_AND_ASSIGN(auto buffer, sink->Finish());
auto source = std::make_shared<::arrow::io::BufferReader>(buffer);
auto file_reader = ParquetFileReader::Open(source);
ASSERT_EQ(kExpectedSchema, file_reader->metadata()->schema()->ToString());

// Read and verify data
std::unique_ptr<FileReader> reader;
ASSERT_OK(FileReader::Make(default_memory_pool(), std::move(file_reader), &reader));
std::shared_ptr<Table> table;
ASSERT_OK(reader->ReadTable(&table));

auto arrow_inner_element =
::arrow::field("array", ::arrow::int32(), /*nullable=*/false);
auto arrow_outer_element =
::arrow::field("array", ::arrow::list(arrow_inner_element), /*nullable=*/false);
auto arrow_list = ::arrow::list(arrow_outer_element);
auto arrow_schema =
::arrow::schema({::arrow::field("nested_list", arrow_list, /*nullable=*/true)});
auto expected_table = ::arrow::TableFromJSON(arrow_schema, {R"([[[[1,2],[3,4]]]])"});
::arrow::AssertTablesEqual(*expected_table, *table);
}

class TestArrowReaderAdHocSparkAndHvr
: public ::testing::TestWithParam<
std::tuple<std::string, std::shared_ptr<DataType>>> {};
Expand Down
20 changes: 20 additions & 0 deletions cpp/src/parquet/arrow/arrow_schema_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -601,6 +601,26 @@ TEST_F(TestConvertParquetSchema, ParquetLists) {
arrow_fields.push_back(::arrow::field("name", arrow_list, false));
}

// Two-level encoding List<List<Integer>>:
// optional group nested_list (LIST) {
// repeated group array (LIST) {
// repeated int32 array;
// }
// }
{
auto inner_element =
PrimitiveNode::Make("array", Repetition::REPEATED, ParquetType::INT32);
auto outer_element = GroupNode::Make("array", Repetition::REPEATED, {inner_element},
ConvertedType::LIST);
parquet_fields.push_back(GroupNode::Make("nested_list", Repetition::OPTIONAL,
{outer_element}, ConvertedType::LIST));
auto arrow_inner_element = ::arrow::field("array", INT32, /*nullable=*/false);
auto arrow_outer_element =
::arrow::field("array", ::arrow::list(arrow_inner_element), /*nullable=*/false);
auto arrow_list = ::arrow::list(arrow_outer_element);
arrow_fields.push_back(::arrow::field("nested_list", arrow_list, true));
}

auto arrow_schema = ::arrow::schema(arrow_fields);
ASSERT_OK(ConvertSchema(parquet_fields));

Expand Down
4 changes: 4 additions & 0 deletions cpp/src/parquet/arrow/schema.cc
Original file line number Diff line number Diff line change
Expand Up @@ -681,6 +681,10 @@ Status ListToSchemaField(const GroupNode& group, LevelInfo current_levels,
// List of primitive type
RETURN_NOT_OK(
NodeToSchemaField(*list_group.field(0), current_levels, ctx, out, child_field));
} else if (list_group.field_count() == 1 && list_group.field(0)->is_repeated()) {
// Special case for nested list in two-level list encoding
RETURN_NOT_OK(
NodeToSchemaField(*list_group.field(0), current_levels, ctx, out, child_field));
} else {
RETURN_NOT_OK(GroupToStruct(list_group, current_levels, ctx, out, child_field));
}
Expand Down
Loading