From 7337c150bf747e9f5d74ba944d2ffe873d38a35a Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Tue, 24 Jan 2017 08:42:03 +0100 Subject: [PATCH] PARQUET-836: Bugfix + testcase for column subsetting in arrow::FileReader::ReadFlatTable Also adds an option to set the random seed when generating data for the Arrow test cases Author: Wes McKinney Closes #223 from wesm/PARQUET-836 and squashes the following commits: 673cc0d [Wes McKinney] Add randomness option to Arrow data generation. Test case for FileReader::ReadFlatTable with column subsetting Change-Id: Ia5577f31157d2c05f9e9e84faf95fd69ac602a86 --- .../parquet/arrow/arrow-reader-writer-test.cc | 106 +++++++++++++----- cpp/src/parquet/arrow/reader.cc | 2 +- cpp/src/parquet/arrow/test-util.h | 29 ++++- 3 files changed, 99 insertions(+), 38 deletions(-) diff --git a/cpp/src/parquet/arrow/arrow-reader-writer-test.cc b/cpp/src/parquet/arrow/arrow-reader-writer-test.cc index 6748a8dae067e..d681e5763c86e 100644 --- a/cpp/src/parquet/arrow/arrow-reader-writer-test.cc +++ b/cpp/src/parquet/arrow/arrow-reader-writer-test.cc @@ -32,6 +32,7 @@ using arrow::Array; using arrow::Buffer; +using arrow::Column; using arrow::ChunkedArray; using arrow::default_memory_pool; using arrow::io::BufferReader; @@ -51,6 +52,8 @@ namespace arrow { const int SMALL_SIZE = 100; const int LARGE_SIZE = 10000; +constexpr uint32_t kDefaultSeed = 0; + template struct test_traits {}; @@ -185,23 +188,6 @@ using ParquetDataType = DataType::parquet_enum>; template using ParquetWriter = TypedColumnWriter>; -void DoTableRoundtrip( - const std::shared_ptr& table, int num_threads, std::shared_ptr
* out) { - auto sink = std::make_shared(); - - ASSERT_OK_NO_THROW(WriteFlatTable( - table.get(), ::arrow::default_memory_pool(), sink, (table->num_rows() + 1) / 2)); - - std::shared_ptr buffer = sink->GetBuffer(); - std::unique_ptr reader; - ASSERT_OK_NO_THROW( - OpenFile(std::make_shared(buffer), ::arrow::default_memory_pool(), - ::parquet::default_reader_properties(), nullptr, &reader)); - - reader->set_num_threads(num_threads); - ASSERT_OK_NO_THROW(reader->ReadFlatTable(out)); -} - template class TestParquetIO : public ::testing::Test { public: @@ -324,7 +310,8 @@ TYPED_TEST(TestParquetIO, SingleColumnTableRequiredWrite) { TYPED_TEST(TestParquetIO, SingleColumnOptionalReadWrite) { // This also tests max_definition_level = 1 std::shared_ptr values; - ASSERT_OK(NullableArray(SMALL_SIZE, 10, &values)); + + ASSERT_OK(NullableArray(SMALL_SIZE, 10, kDefaultSeed, &values)); std::shared_ptr schema = this->MakeSchema(Repetition::OPTIONAL); this->WriteFlatColumn(schema, values); @@ -335,7 +322,8 @@ TYPED_TEST(TestParquetIO, SingleColumnOptionalReadWrite) { TYPED_TEST(TestParquetIO, SingleColumnTableOptionalReadWrite) { // This also tests max_definition_level = 1 std::shared_ptr values; - ASSERT_OK(NullableArray(SMALL_SIZE, 10, &values)); + + ASSERT_OK(NullableArray(SMALL_SIZE, 10, kDefaultSeed, &values)); std::shared_ptr
table = MakeSimpleTable(values, true); this->sink_ = std::make_shared(); ASSERT_OK_NO_THROW(WriteFlatTable(table.get(), ::arrow::default_memory_pool(), @@ -407,7 +395,8 @@ TYPED_TEST(TestParquetIO, SingleColumnTableRequiredChunkedWriteArrowIO) { TYPED_TEST(TestParquetIO, SingleColumnOptionalChunkedWrite) { int64_t chunk_size = SMALL_SIZE / 4; std::shared_ptr values; - ASSERT_OK(NullableArray(SMALL_SIZE, 10, &values)); + + ASSERT_OK(NullableArray(SMALL_SIZE, 10, kDefaultSeed, &values)); std::shared_ptr schema = this->MakeSchema(Repetition::OPTIONAL); FileWriter writer(::arrow::default_memory_pool(), this->MakeWriter(schema)); @@ -424,7 +413,8 @@ TYPED_TEST(TestParquetIO, SingleColumnOptionalChunkedWrite) { TYPED_TEST(TestParquetIO, SingleColumnTableOptionalChunkedWrite) { // This also tests max_definition_level = 1 std::shared_ptr values; - ASSERT_OK(NullableArray(LARGE_SIZE, 100, &values)); + + ASSERT_OK(NullableArray(LARGE_SIZE, 100, kDefaultSeed, &values)); std::shared_ptr
table = MakeSimpleTable(values, true); this->sink_ = std::make_shared(); ASSERT_OK_NO_THROW(WriteFlatTable(table.get(), ::arrow::default_memory_pool(), @@ -490,7 +480,8 @@ using TestUInt32ParquetIO = TestParquetIO<::arrow::UInt32Type>; TEST_F(TestUInt32ParquetIO, Parquet_2_0_Compability) { // This also tests max_definition_level = 1 std::shared_ptr values; - ASSERT_OK(NullableArray<::arrow::UInt32Type>(LARGE_SIZE, 100, &values)); + + ASSERT_OK(NullableArray<::arrow::UInt32Type>(LARGE_SIZE, 100, kDefaultSeed, &values)); std::shared_ptr
table = MakeSimpleTable(values, true); // Parquet 2.0 roundtrip should yield an uint32_t column again @@ -507,7 +498,7 @@ TEST_F(TestUInt32ParquetIO, Parquet_2_0_Compability) { TEST_F(TestUInt32ParquetIO, Parquet_1_0_Compability) { // This also tests max_definition_level = 1 std::shared_ptr arr; - ASSERT_OK(NullableArray<::arrow::UInt32Type>(LARGE_SIZE, 100, &arr)); + ASSERT_OK(NullableArray<::arrow::UInt32Type>(LARGE_SIZE, 100, kDefaultSeed, &arr)); std::shared_ptr<::arrow::UInt32Array> values = std::dynamic_pointer_cast<::arrow::UInt32Array>(arr); @@ -660,18 +651,15 @@ TYPED_TEST(TestPrimitiveParquetIO, SingleColumnRequiredChunkedTableRead) { this->CheckSingleColumnRequiredTableRead(4); } -TEST(TestArrowReadWrite, MultithreadedRead) { - const int num_columns = 20; - const int num_rows = 1000; - const int num_threads = 4; - +void MakeDoubleTable(int num_columns, int num_rows, std::shared_ptr
* out) { std::shared_ptr<::arrow::Column> column; std::vector> columns(num_columns); std::vector> fields(num_columns); std::shared_ptr values; for (int i = 0; i < num_columns; ++i) { - ASSERT_OK(NullableArray<::arrow::DoubleType>(num_rows, num_rows / 10, &values)); + ASSERT_OK(NullableArray<::arrow::DoubleType>( + num_rows, num_rows / 10, static_cast(i), &values)); std::stringstream ss; ss << "col" << i; column = MakeColumn(ss.str(), values, true); @@ -680,13 +668,69 @@ TEST(TestArrowReadWrite, MultithreadedRead) { fields[i] = column->field(); } auto schema = std::make_shared<::arrow::Schema>(fields); - auto table = std::make_shared
("schema", schema, columns); + *out = std::make_shared
("schema", schema, columns); +} + +void DoTableRoundtrip(const std::shared_ptr
& table, int num_threads, + const std::vector& column_subset, std::shared_ptr
* out) { + auto sink = std::make_shared(); + + ASSERT_OK_NO_THROW(WriteFlatTable( + table.get(), ::arrow::default_memory_pool(), sink, (table->num_rows() + 1) / 2)); + + std::shared_ptr buffer = sink->GetBuffer(); + std::unique_ptr reader; + ASSERT_OK_NO_THROW( + OpenFile(std::make_shared(buffer), ::arrow::default_memory_pool(), + ::parquet::default_reader_properties(), nullptr, &reader)); + + reader->set_num_threads(num_threads); + + if (column_subset.size() > 0) { + ASSERT_OK_NO_THROW(reader->ReadFlatTable(column_subset, out)); + } else { + // Read everything + ASSERT_OK_NO_THROW(reader->ReadFlatTable(out)); + } +} + +TEST(TestArrowReadWrite, MultithreadedRead) { + const int num_columns = 20; + const int num_rows = 1000; + const int num_threads = 4; + + std::shared_ptr
table; + MakeDoubleTable(num_columns, num_rows, &table); std::shared_ptr
result; - DoTableRoundtrip(table, num_threads, &result); + DoTableRoundtrip(table, num_threads, {}, &result); ASSERT_TRUE(table->Equals(result)); } +TEST(TestArrowReadWrite, ReadColumnSubset) { + const int num_columns = 20; + const int num_rows = 1000; + const int num_threads = 4; + + std::shared_ptr
table; + MakeDoubleTable(num_columns, num_rows, &table); + + std::shared_ptr
result; + std::vector column_subset = {0, 4, 8, 10}; + DoTableRoundtrip(table, num_threads, column_subset, &result); + + std::vector> ex_columns; + std::vector> ex_fields; + for (int i : column_subset) { + ex_columns.push_back(table->column(i)); + ex_fields.push_back(table->column(i)->field()); + } + + auto ex_schema = std::make_shared<::arrow::Schema>(ex_fields); + auto expected = std::make_shared
("schema", ex_schema, ex_columns); + ASSERT_TRUE(result->Equals(expected)); +} + } // namespace arrow } // namespace parquet diff --git a/cpp/src/parquet/arrow/reader.cc b/cpp/src/parquet/arrow/reader.cc index 9221041c78592..a60d0b2753351 100644 --- a/cpp/src/parquet/arrow/reader.cc +++ b/cpp/src/parquet/arrow/reader.cc @@ -222,7 +222,7 @@ Status FileReader::Impl::ReadFlatTable( auto ReadColumn = [&indices, &schema, &columns, this](int i) { std::shared_ptr array; RETURN_NOT_OK(ReadFlatColumn(indices[i], &array)); - columns[i] = std::make_shared(schema->field(indices[i]), array); + columns[i] = std::make_shared(schema->field(i), array); return Status::OK(); }; diff --git a/cpp/src/parquet/arrow/test-util.h b/cpp/src/parquet/arrow/test-util.h index f996c2c11779c..4760f0e8e7e24 100644 --- a/cpp/src/parquet/arrow/test-util.h +++ b/cpp/src/parquet/arrow/test-util.h @@ -90,9 +90,10 @@ typename std::enable_if::value, Status>::type NonNullAr // This helper function only supports (size/2) nulls. template typename std::enable_if::value, Status>::type NullableArray( - size_t size, size_t num_nulls, std::shared_ptr* out) { + size_t size, size_t num_nulls, uint32_t seed, std::shared_ptr* out) { std::vector values; - ::arrow::test::random_real(size, 0, 0, 1, &values); + ::arrow::test::random_real( + size, seed, -1e10, 1e10, &values); std::vector valid_bytes(size, 1); for (size_t i = 0; i < num_nulls; i++) { @@ -108,8 +109,11 @@ typename std::enable_if::value, Status>::type Nullable // This helper function only supports (size/2) nulls. template typename std::enable_if::value, Status>::type NullableArray( - size_t size, size_t num_nulls, std::shared_ptr* out) { + size_t size, size_t num_nulls, uint32_t seed, std::shared_ptr* out) { std::vector values; + + // Seed is random in Arrow right now + (void)seed; ::arrow::test::randint(size, 0, 64, &values); std::vector valid_bytes(size, 1); @@ -127,7 +131,8 @@ typename std::enable_if::value, Status>::type NullableAr template typename std::enable_if< is_arrow_string::value || is_arrow_binary::value, Status>::type -NullableArray(size_t size, size_t num_nulls, std::shared_ptr<::arrow::Array>* out) { +NullableArray( + size_t size, size_t num_nulls, uint32_t seed, std::shared_ptr<::arrow::Array>* out) { std::vector valid_bytes(size, 1); for (size_t i = 0; i < num_nulls; i++) { @@ -136,8 +141,16 @@ NullableArray(size_t size, size_t num_nulls, std::shared_ptr<::arrow::Array>* ou using BuilderType = typename ::arrow::TypeTraits::BuilderType; BuilderType builder(::arrow::default_memory_pool(), std::make_shared()); + + const int kBufferSize = 10; + uint8_t buffer[kBufferSize]; for (size_t i = 0; i < size; i++) { - builder.Append("test-string"); + if (!valid_bytes[i]) { + builder.AppendNull(); + } else { + ::arrow::test::random_bytes(kBufferSize, seed + i, buffer); + builder.Append(buffer, kBufferSize); + } } return builder.Finish(out); } @@ -145,8 +158,12 @@ NullableArray(size_t size, size_t num_nulls, std::shared_ptr<::arrow::Array>* ou // This helper function only supports (size/2) nulls yet. template typename std::enable_if::value, Status>::type NullableArray( - size_t size, size_t num_nulls, std::shared_ptr* out) { + size_t size, size_t num_nulls, uint32_t seed, std::shared_ptr* out) { std::vector values; + + // Seed is random in Arrow right now + (void)seed; + ::arrow::test::randint(size, 0, 1, &values); std::vector valid_bytes(size, 1);