Skip to content

Commit

Permalink
PARQUET-836: Bugfix + testcase for column subsetting in arrow::FileRe…
Browse files Browse the repository at this point in the history
…ader::ReadFlatTable

Also adds an option to set the random seed when generating data for the Arrow test cases

Author: Wes McKinney <wes.mckinney@twosigma.com>

Closes apache#223 from wesm/PARQUET-836 and squashes the following commits:

673cc0d [Wes McKinney] Add randomness option to Arrow data generation. Test case for FileReader::ReadFlatTable with column subsetting

Change-Id: Ia5577f31157d2c05f9e9e84faf95fd69ac602a86
  • Loading branch information
wesm authored and xhochy committed Jan 24, 2017
1 parent f24bb88 commit 7337c15
Show file tree
Hide file tree
Showing 3 changed files with 99 additions and 38 deletions.
106 changes: 75 additions & 31 deletions cpp/src/parquet/arrow/arrow-reader-writer-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

using arrow::Array;
using arrow::Buffer;
using arrow::Column;
using arrow::ChunkedArray;
using arrow::default_memory_pool;
using arrow::io::BufferReader;
Expand All @@ -51,6 +52,8 @@ namespace arrow {
const int SMALL_SIZE = 100;
const int LARGE_SIZE = 10000;

constexpr uint32_t kDefaultSeed = 0;

template <typename TestType>
struct test_traits {};

Expand Down Expand Up @@ -185,23 +188,6 @@ using ParquetDataType = DataType<test_traits<T>::parquet_enum>;
template <typename T>
using ParquetWriter = TypedColumnWriter<ParquetDataType<T>>;

void DoTableRoundtrip(
const std::shared_ptr<Table>& table, int num_threads, std::shared_ptr<Table>* out) {
auto sink = std::make_shared<InMemoryOutputStream>();

ASSERT_OK_NO_THROW(WriteFlatTable(
table.get(), ::arrow::default_memory_pool(), sink, (table->num_rows() + 1) / 2));

std::shared_ptr<Buffer> buffer = sink->GetBuffer();
std::unique_ptr<FileReader> reader;
ASSERT_OK_NO_THROW(
OpenFile(std::make_shared<BufferReader>(buffer), ::arrow::default_memory_pool(),
::parquet::default_reader_properties(), nullptr, &reader));

reader->set_num_threads(num_threads);
ASSERT_OK_NO_THROW(reader->ReadFlatTable(out));
}

template <typename TestType>
class TestParquetIO : public ::testing::Test {
public:
Expand Down Expand Up @@ -324,7 +310,8 @@ TYPED_TEST(TestParquetIO, SingleColumnTableRequiredWrite) {
TYPED_TEST(TestParquetIO, SingleColumnOptionalReadWrite) {
// This also tests max_definition_level = 1
std::shared_ptr<Array> values;
ASSERT_OK(NullableArray<TypeParam>(SMALL_SIZE, 10, &values));

ASSERT_OK(NullableArray<TypeParam>(SMALL_SIZE, 10, kDefaultSeed, &values));

std::shared_ptr<GroupNode> schema = this->MakeSchema(Repetition::OPTIONAL);
this->WriteFlatColumn(schema, values);
Expand All @@ -335,7 +322,8 @@ TYPED_TEST(TestParquetIO, SingleColumnOptionalReadWrite) {
TYPED_TEST(TestParquetIO, SingleColumnTableOptionalReadWrite) {
// This also tests max_definition_level = 1
std::shared_ptr<Array> values;
ASSERT_OK(NullableArray<TypeParam>(SMALL_SIZE, 10, &values));

ASSERT_OK(NullableArray<TypeParam>(SMALL_SIZE, 10, kDefaultSeed, &values));
std::shared_ptr<Table> table = MakeSimpleTable(values, true);
this->sink_ = std::make_shared<InMemoryOutputStream>();
ASSERT_OK_NO_THROW(WriteFlatTable(table.get(), ::arrow::default_memory_pool(),
Expand Down Expand Up @@ -407,7 +395,8 @@ TYPED_TEST(TestParquetIO, SingleColumnTableRequiredChunkedWriteArrowIO) {
TYPED_TEST(TestParquetIO, SingleColumnOptionalChunkedWrite) {
int64_t chunk_size = SMALL_SIZE / 4;
std::shared_ptr<Array> values;
ASSERT_OK(NullableArray<TypeParam>(SMALL_SIZE, 10, &values));

ASSERT_OK(NullableArray<TypeParam>(SMALL_SIZE, 10, kDefaultSeed, &values));

std::shared_ptr<GroupNode> schema = this->MakeSchema(Repetition::OPTIONAL);
FileWriter writer(::arrow::default_memory_pool(), this->MakeWriter(schema));
Expand All @@ -424,7 +413,8 @@ TYPED_TEST(TestParquetIO, SingleColumnOptionalChunkedWrite) {
TYPED_TEST(TestParquetIO, SingleColumnTableOptionalChunkedWrite) {
// This also tests max_definition_level = 1
std::shared_ptr<Array> values;
ASSERT_OK(NullableArray<TypeParam>(LARGE_SIZE, 100, &values));

ASSERT_OK(NullableArray<TypeParam>(LARGE_SIZE, 100, kDefaultSeed, &values));
std::shared_ptr<Table> table = MakeSimpleTable(values, true);
this->sink_ = std::make_shared<InMemoryOutputStream>();
ASSERT_OK_NO_THROW(WriteFlatTable(table.get(), ::arrow::default_memory_pool(),
Expand Down Expand Up @@ -490,7 +480,8 @@ using TestUInt32ParquetIO = TestParquetIO<::arrow::UInt32Type>;
TEST_F(TestUInt32ParquetIO, Parquet_2_0_Compability) {
// This also tests max_definition_level = 1
std::shared_ptr<Array> values;
ASSERT_OK(NullableArray<::arrow::UInt32Type>(LARGE_SIZE, 100, &values));

ASSERT_OK(NullableArray<::arrow::UInt32Type>(LARGE_SIZE, 100, kDefaultSeed, &values));
std::shared_ptr<Table> table = MakeSimpleTable(values, true);

// Parquet 2.0 roundtrip should yield an uint32_t column again
Expand All @@ -507,7 +498,7 @@ TEST_F(TestUInt32ParquetIO, Parquet_2_0_Compability) {
TEST_F(TestUInt32ParquetIO, Parquet_1_0_Compability) {
// This also tests max_definition_level = 1
std::shared_ptr<Array> arr;
ASSERT_OK(NullableArray<::arrow::UInt32Type>(LARGE_SIZE, 100, &arr));
ASSERT_OK(NullableArray<::arrow::UInt32Type>(LARGE_SIZE, 100, kDefaultSeed, &arr));

std::shared_ptr<::arrow::UInt32Array> values =
std::dynamic_pointer_cast<::arrow::UInt32Array>(arr);
Expand Down Expand Up @@ -660,18 +651,15 @@ TYPED_TEST(TestPrimitiveParquetIO, SingleColumnRequiredChunkedTableRead) {
this->CheckSingleColumnRequiredTableRead(4);
}

TEST(TestArrowReadWrite, MultithreadedRead) {
const int num_columns = 20;
const int num_rows = 1000;
const int num_threads = 4;

void MakeDoubleTable(int num_columns, int num_rows, std::shared_ptr<Table>* out) {
std::shared_ptr<::arrow::Column> column;
std::vector<std::shared_ptr<::arrow::Column>> columns(num_columns);
std::vector<std::shared_ptr<::arrow::Field>> fields(num_columns);

std::shared_ptr<Array> values;
for (int i = 0; i < num_columns; ++i) {
ASSERT_OK(NullableArray<::arrow::DoubleType>(num_rows, num_rows / 10, &values));
ASSERT_OK(NullableArray<::arrow::DoubleType>(
num_rows, num_rows / 10, static_cast<uint32_t>(i), &values));
std::stringstream ss;
ss << "col" << i;
column = MakeColumn(ss.str(), values, true);
Expand All @@ -680,13 +668,69 @@ TEST(TestArrowReadWrite, MultithreadedRead) {
fields[i] = column->field();
}
auto schema = std::make_shared<::arrow::Schema>(fields);
auto table = std::make_shared<Table>("schema", schema, columns);
*out = std::make_shared<Table>("schema", schema, columns);
}

void DoTableRoundtrip(const std::shared_ptr<Table>& table, int num_threads,
const std::vector<int>& column_subset, std::shared_ptr<Table>* out) {
auto sink = std::make_shared<InMemoryOutputStream>();

ASSERT_OK_NO_THROW(WriteFlatTable(
table.get(), ::arrow::default_memory_pool(), sink, (table->num_rows() + 1) / 2));

std::shared_ptr<Buffer> buffer = sink->GetBuffer();
std::unique_ptr<FileReader> reader;
ASSERT_OK_NO_THROW(
OpenFile(std::make_shared<BufferReader>(buffer), ::arrow::default_memory_pool(),
::parquet::default_reader_properties(), nullptr, &reader));

reader->set_num_threads(num_threads);

if (column_subset.size() > 0) {
ASSERT_OK_NO_THROW(reader->ReadFlatTable(column_subset, out));
} else {
// Read everything
ASSERT_OK_NO_THROW(reader->ReadFlatTable(out));
}
}

TEST(TestArrowReadWrite, MultithreadedRead) {
const int num_columns = 20;
const int num_rows = 1000;
const int num_threads = 4;

std::shared_ptr<Table> table;
MakeDoubleTable(num_columns, num_rows, &table);

std::shared_ptr<Table> result;
DoTableRoundtrip(table, num_threads, &result);
DoTableRoundtrip(table, num_threads, {}, &result);

ASSERT_TRUE(table->Equals(result));
}

TEST(TestArrowReadWrite, ReadColumnSubset) {
const int num_columns = 20;
const int num_rows = 1000;
const int num_threads = 4;

std::shared_ptr<Table> table;
MakeDoubleTable(num_columns, num_rows, &table);

std::shared_ptr<Table> result;
std::vector<int> column_subset = {0, 4, 8, 10};
DoTableRoundtrip(table, num_threads, column_subset, &result);

std::vector<std::shared_ptr<::arrow::Column>> ex_columns;
std::vector<std::shared_ptr<::arrow::Field>> ex_fields;
for (int i : column_subset) {
ex_columns.push_back(table->column(i));
ex_fields.push_back(table->column(i)->field());
}

auto ex_schema = std::make_shared<::arrow::Schema>(ex_fields);
auto expected = std::make_shared<Table>("schema", ex_schema, ex_columns);
ASSERT_TRUE(result->Equals(expected));
}

} // namespace arrow
} // namespace parquet
2 changes: 1 addition & 1 deletion cpp/src/parquet/arrow/reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ Status FileReader::Impl::ReadFlatTable(
auto ReadColumn = [&indices, &schema, &columns, this](int i) {
std::shared_ptr<Array> array;
RETURN_NOT_OK(ReadFlatColumn(indices[i], &array));
columns[i] = std::make_shared<Column>(schema->field(indices[i]), array);
columns[i] = std::make_shared<Column>(schema->field(i), array);
return Status::OK();
};

Expand Down
29 changes: 23 additions & 6 deletions cpp/src/parquet/arrow/test-util.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,10 @@ typename std::enable_if<is_arrow_bool<ArrowType>::value, Status>::type NonNullAr
// This helper function only supports (size/2) nulls.
template <typename ArrowType>
typename std::enable_if<is_arrow_float<ArrowType>::value, Status>::type NullableArray(
size_t size, size_t num_nulls, std::shared_ptr<Array>* out) {
size_t size, size_t num_nulls, uint32_t seed, std::shared_ptr<Array>* out) {
std::vector<typename ArrowType::c_type> values;
::arrow::test::random_real<typename ArrowType::c_type>(size, 0, 0, 1, &values);
::arrow::test::random_real<typename ArrowType::c_type>(
size, seed, -1e10, 1e10, &values);
std::vector<uint8_t> valid_bytes(size, 1);

for (size_t i = 0; i < num_nulls; i++) {
Expand All @@ -108,8 +109,11 @@ typename std::enable_if<is_arrow_float<ArrowType>::value, Status>::type Nullable
// This helper function only supports (size/2) nulls.
template <typename ArrowType>
typename std::enable_if<is_arrow_int<ArrowType>::value, Status>::type NullableArray(
size_t size, size_t num_nulls, std::shared_ptr<Array>* out) {
size_t size, size_t num_nulls, uint32_t seed, std::shared_ptr<Array>* out) {
std::vector<typename ArrowType::c_type> values;

// Seed is random in Arrow right now
(void)seed;
::arrow::test::randint<typename ArrowType::c_type>(size, 0, 64, &values);
std::vector<uint8_t> valid_bytes(size, 1);

Expand All @@ -127,7 +131,8 @@ typename std::enable_if<is_arrow_int<ArrowType>::value, Status>::type NullableAr
template <typename ArrowType>
typename std::enable_if<
is_arrow_string<ArrowType>::value || is_arrow_binary<ArrowType>::value, Status>::type
NullableArray(size_t size, size_t num_nulls, std::shared_ptr<::arrow::Array>* out) {
NullableArray(
size_t size, size_t num_nulls, uint32_t seed, std::shared_ptr<::arrow::Array>* out) {
std::vector<uint8_t> valid_bytes(size, 1);

for (size_t i = 0; i < num_nulls; i++) {
Expand All @@ -136,17 +141,29 @@ NullableArray(size_t size, size_t num_nulls, std::shared_ptr<::arrow::Array>* ou

using BuilderType = typename ::arrow::TypeTraits<ArrowType>::BuilderType;
BuilderType builder(::arrow::default_memory_pool(), std::make_shared<ArrowType>());

const int kBufferSize = 10;
uint8_t buffer[kBufferSize];
for (size_t i = 0; i < size; i++) {
builder.Append("test-string");
if (!valid_bytes[i]) {
builder.AppendNull();
} else {
::arrow::test::random_bytes(kBufferSize, seed + i, buffer);
builder.Append(buffer, kBufferSize);
}
}
return builder.Finish(out);
}

// This helper function only supports (size/2) nulls yet.
template <class ArrowType>
typename std::enable_if<is_arrow_bool<ArrowType>::value, Status>::type NullableArray(
size_t size, size_t num_nulls, std::shared_ptr<Array>* out) {
size_t size, size_t num_nulls, uint32_t seed, std::shared_ptr<Array>* out) {
std::vector<uint8_t> values;

// Seed is random in Arrow right now
(void)seed;

::arrow::test::randint<uint8_t>(size, 0, 1, &values);
std::vector<uint8_t> valid_bytes(size, 1);

Expand Down

0 comments on commit 7337c15

Please sign in to comment.