From d9940d886c435f75d59aa513bec85f0f08f2156f Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Tue, 29 Mar 2016 18:48:01 -0700 Subject: [PATCH 1/5] Public API draft --- cpp/src/arrow/parquet/reader.cc | 29 ++++++++ cpp/src/arrow/parquet/reader.h | 128 ++++++++++++++++++++++++++++++++ cpp/src/arrow/parquet/schema.h | 2 +- 3 files changed, 158 insertions(+), 1 deletion(-) create mode 100644 cpp/src/arrow/parquet/reader.cc create mode 100644 cpp/src/arrow/parquet/reader.h diff --git a/cpp/src/arrow/parquet/reader.cc b/cpp/src/arrow/parquet/reader.cc new file mode 100644 index 0000000000000..3b3ea04e11312 --- /dev/null +++ b/cpp/src/arrow/parquet/reader.cc @@ -0,0 +1,29 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow/parquet/reader.h" + +namespace arrow { +namespace parquet { + +class FileReader::Impl { + private: + std::unique_ptr<::parquet::ParquetFileReader> reader_; +}; + +} // namespace parquet +} // namespace arrow diff --git a/cpp/src/arrow/parquet/reader.h b/cpp/src/arrow/parquet/reader.h new file mode 100644 index 0000000000000..bc345662c8658 --- /dev/null +++ b/cpp/src/arrow/parquet/reader.h @@ -0,0 +1,128 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef ARROW_PARQUET_READER_H +#define ARROW_PARQUET_READER_H + +#include + +#include "parquet/api/reader.h" +#include "parquet/api/schema.h" + +namespace arrow { + +class Array; +class MemoryPool; +class RowBatch; +class Status; + +namespace parquet { + +class FlatColumnReader; + +// Arrow read adapter class for deserializing Parquet files as Arrow row +// batches. +// +// TODO(wesm): nested data does not always make sense with this user +// interface unless you are only reading a single leaf node from a branch of +// a table. For example: +// +// repeated group data { +// optional group record { +// optional int32 val1; +// optional byte_array val2; +// optional bool val3; +// } +// optional int32 val4; +// } +// +// In the Parquet file, there are 3 leaf nodes: +// +// * data.record.val1 +// * data.record.val2 +// * data.record.val3 +// * data.val4 +// +// When materializing this data in an Arrow array, we would have: +// +// data: list), +// val3: bool, +// >, +// val4: int32 +// >> +// +// However, in the Parquet format, each leaf node has its own repetition and +// definition levels describing the structure of the intermediate nodes in +// this array structure. Thus, we will need to scan the leaf data for a group +// of leaf nodes part of the same type tree to create a single result Arrow +// nested array structure. +// +// This is additionally complicated "chunky" repeated fields or very large byte +// arrays +class FileReader { + public: + ArrowReader(MemoryPool* pool, + std::unique_ptr<::parquet::ParquetFileReader> reader); + + // Since the distribution of columns amongst a Parquet file's row groups may + // be uneven (the number of values in each column chunk can be different), we + // provide a column-oriented read interface. The ColumnReader hides the + // details of paging through the file's row groups and yielding + // fully-materialized arrow::Array instances + // + // Returns error status if the column of interest is not flat. + Status GetFlatColumn(int i, std::unique_ptr* out); + + private: + class Impl; + std::unique_ptr impl_; +}; + +// At this point, the column reader is a stream iterator. It only knows how to +// read the next batch of values for a particular column from the file until it +// runs out. +// +// We also do not expose any internal Parquet details, such as row groups. This +// might change in the future. +class FlatColumnReader { + public: + // Scan the next array of the indicated size. The actual size of the + // returned array may be less than the passed size depending how much data is + // available in the file. + // + // When all the data in the file has been exhausted, the result is set to + // nullptr. + // + // Returns Status::OK on a successful read, including if you have exhausted + // the data available in the file. + Status NextBatch(int batch_size, std::shared_ptr* out); + + private: + class Impl; + std::unique_ptr impl_; + + friend class FileReader; +}; + +} // namespace parquet + +} // namespace arrow + +#endif ARROW_PARQUET_READER_H diff --git a/cpp/src/arrow/parquet/schema.h b/cpp/src/arrow/parquet/schema.h index bfc7d21138154..5eedb715f2af2 100644 --- a/cpp/src/arrow/parquet/schema.h +++ b/cpp/src/arrow/parquet/schema.h @@ -45,4 +45,4 @@ Status ToParquetSchema( } // namespace arrow -#endif +#endif // ARROW_PARQUET_SCHEMA_H From 8d2db22fe11728d328eef8579c8221c77275cfd8 Mon Sep 17 00:00:00 2001 From: "Uwe L. Korn" Date: Sun, 8 May 2016 16:47:47 +0200 Subject: [PATCH 2/5] ARROW-91: Basic Parquet read support --- cpp/src/arrow/parquet/CMakeLists.txt | 4 + cpp/src/arrow/parquet/parquet-reader-test.cc | 120 +++++++++++++++++ cpp/src/arrow/parquet/reader.cc | 132 ++++++++++++++++++- cpp/src/arrow/parquet/reader.h | 16 ++- cpp/src/arrow/parquet/schema.h | 2 +- 5 files changed, 266 insertions(+), 8 deletions(-) create mode 100644 cpp/src/arrow/parquet/parquet-reader-test.cc diff --git a/cpp/src/arrow/parquet/CMakeLists.txt b/cpp/src/arrow/parquet/CMakeLists.txt index 0d5cf263ec3e2..1ae6709652ea5 100644 --- a/cpp/src/arrow/parquet/CMakeLists.txt +++ b/cpp/src/arrow/parquet/CMakeLists.txt @@ -19,6 +19,7 @@ # arrow_parquet : Arrow <-> Parquet adapter set(PARQUET_SRCS + reader.cc schema.cc ) @@ -36,6 +37,9 @@ SET_TARGET_PROPERTIES(arrow_parquet PROPERTIES LINKER_LANGUAGE CXX) ADD_ARROW_TEST(parquet-schema-test) ARROW_TEST_LINK_LIBRARIES(parquet-schema-test arrow_parquet) +ADD_ARROW_TEST(parquet-reader-test) +ARROW_TEST_LINK_LIBRARIES(parquet-reader-test arrow_parquet) + # Headers: top level install(FILES DESTINATION include/arrow/parquet) diff --git a/cpp/src/arrow/parquet/parquet-reader-test.cc b/cpp/src/arrow/parquet/parquet-reader-test.cc new file mode 100644 index 0000000000000..bb0ad50caaaf7 --- /dev/null +++ b/cpp/src/arrow/parquet/parquet-reader-test.cc @@ -0,0 +1,120 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "gtest/gtest.h" + +#include "arrow/test-util.h" +#include "arrow/parquet/reader.h" +#include "arrow/types/primitive.h" +#include "arrow/util/memory-pool.h" +#include "arrow/util/status.h" + +#include "parquet/api/schema.h" +#include "parquet/column/writer.h" +#include "parquet/file/reader.h" +#include "parquet/file/writer.h" +#include "parquet/util/input.h" +#include "parquet/util/output.h" + +using ParquetBuffer = parquet::Buffer; +using parquet::BufferReader; +using parquet::InMemoryOutputStream; +using parquet::Int64Writer; +using parquet::ParquetFileReader; +using parquet::ParquetFileWriter; +using parquet::RandomAccessSource; +using parquet::Repetition; +using parquet::SchemaDescriptor; +using ParquetType = parquet::Type; +using parquet::schema::GroupNode; +using parquet::schema::NodePtr; +using parquet::schema::PrimitiveNode; + +namespace arrow { + +namespace parquet { + +class TestReadParquet : public ::testing::Test { + public: + virtual void SetUp() {} + + std::shared_ptr Int64Schema() { + auto pnode = PrimitiveNode::Make("int64", Repetition::REQUIRED, ParquetType::INT64); + NodePtr node_ = + GroupNode::Make("schema", Repetition::REQUIRED, std::vector({pnode})); + return std::static_pointer_cast(node_); + } + + std::unique_ptr Int64File( + std::vector& values, int num_chunks) { + std::shared_ptr schema = Int64Schema(); + std::shared_ptr sink(new InMemoryOutputStream()); + auto file_writer = ParquetFileWriter::Open(sink, schema); + size_t chunk_size = values.size() / num_chunks; + for (int i = 0; i < num_chunks; i++) { + auto row_group_writer = file_writer->AppendRowGroup(chunk_size); + auto column_writer = static_cast(row_group_writer->NextColumn()); + int64_t* data = values.data() + i * chunk_size; + column_writer->WriteBatch(chunk_size, nullptr, nullptr, data); + column_writer->Close(); + row_group_writer->Close(); + } + file_writer->Close(); + + std::shared_ptr buffer = sink->GetBuffer(); + std::unique_ptr source(new BufferReader(buffer)); + return ParquetFileReader::Open(std::move(source)); + } + + private: +}; + +TEST_F(TestReadParquet, SingleColumnInt64) { + std::vector values(100, 128); + std::unique_ptr file_reader = Int64File(values, 1); + arrow::parquet::FileReader reader(default_memory_pool(), std::move(file_reader)); + std::unique_ptr column_reader; + ASSERT_OK(reader.GetFlatColumn(0, &column_reader)); + ASSERT_NE(nullptr, column_reader.get()); + std::shared_ptr out; + ASSERT_OK(column_reader->NextBatch(100, &out)); + ASSERT_NE(nullptr, out.get()); + Int64Array* out_array = static_cast(out.get()); + for (size_t i = 0; i < values.size(); i++) { + EXPECT_EQ(values[i], out_array->raw_data()[i]); + } +} + +TEST_F(TestReadParquet, SingleColumnInt64Chunked) { + std::vector values(100, 128); + std::unique_ptr file_reader = Int64File(values, 4); + arrow::parquet::FileReader reader(default_memory_pool(), std::move(file_reader)); + std::unique_ptr column_reader; + ASSERT_OK(reader.GetFlatColumn(0, &column_reader)); + ASSERT_NE(nullptr, column_reader.get()); + std::shared_ptr out; + ASSERT_OK(column_reader->NextBatch(100, &out)); + ASSERT_NE(nullptr, out.get()); + Int64Array* out_array = static_cast(out.get()); + for (size_t i = 0; i < values.size(); i++) { + EXPECT_EQ(values[i], out_array->raw_data()[i]); + } +} + +} // namespace parquet + +} // namespace arrow diff --git a/cpp/src/arrow/parquet/reader.cc b/cpp/src/arrow/parquet/reader.cc index 3b3ea04e11312..4c1b43f779638 100644 --- a/cpp/src/arrow/parquet/reader.cc +++ b/cpp/src/arrow/parquet/reader.cc @@ -17,13 +17,141 @@ #include "arrow/parquet/reader.h" +#include + +#include "arrow/parquet/schema.h" +#include "arrow/schema.h" +#include "arrow/types/primitive.h" +#include "arrow/util/status.h" + +using parquet::ColumnReader; +using parquet::TypedColumnReader; + namespace arrow { namespace parquet { class FileReader::Impl { + public: + Impl(MemoryPool* pool, std::unique_ptr<::parquet::ParquetFileReader> reader); + virtual ~Impl() {} + + Status GetFlatColumn(int i, std::unique_ptr* out); + private: + MemoryPool* pool_; std::unique_ptr<::parquet::ParquetFileReader> reader_; }; -} // namespace parquet -} // namespace arrow +class FlatColumnReader::Impl { + public: + Impl(MemoryPool* pool, const ::parquet::ColumnDescriptor* descr, + std::queue>&& column_readers); + virtual ~Impl() {} + + Status NextBatch(int batch_size, std::shared_ptr* out); + template + Status TypedReadBatch(int batch_size, std::shared_ptr* out); + + private: + MemoryPool* pool_; + const ::parquet::ColumnDescriptor* descr_; + std::queue> column_readers_; + std::shared_ptr field_; +}; + +FileReader::Impl::Impl( + MemoryPool* pool, std::unique_ptr<::parquet::ParquetFileReader> reader) + : pool_(pool), reader_(std::move(reader)) {} + +Status FileReader::Impl::GetFlatColumn(int i, std::unique_ptr* out) { + std::queue> column_readers; + for (int rg = 0; rg < reader_->num_row_groups(); rg++) { + column_readers.push(reader_->RowGroup(rg)->Column(i)); + } + std::unique_ptr impl(new FlatColumnReader::Impl( + pool_, reader_->descr()->Column(i), std::move(column_readers))); + *out = std::unique_ptr(new FlatColumnReader(std::move(impl))); + return Status::OK(); +} + +FileReader::FileReader( + MemoryPool* pool, std::unique_ptr<::parquet::ParquetFileReader> reader) + : impl_(new FileReader::Impl(pool, std::move(reader))) {} + +FileReader::~FileReader() {} + +Status FileReader::GetFlatColumn(int i, std::unique_ptr* out) { + return impl_->GetFlatColumn(i, out); +} + +Status FileReader::ReadFlatColumn(int i, std::shared_ptr* out) { + return Status::OK(); +} + +FlatColumnReader::Impl::Impl(MemoryPool* pool, const ::parquet::ColumnDescriptor* descr, + std::queue>&& column_readers) + : pool_(pool), descr_(descr), column_readers_(column_readers) { + NodeToField(descr_->schema_node(), &field_); +} + +template +Status FlatColumnReader::Impl::TypedReadBatch( + int batch_size, std::shared_ptr* out) { + int values_to_read = batch_size; + NumericBuilder builder(pool_, field_->type); + while ((values_to_read > 0) && (column_readers_.size() > 0)) { + // TODO: This is a lot malloc-thresing and not using the memory pool. + std::vector values(values_to_read); + std::vector def_levels(values_to_read); + auto reader = + dynamic_cast*>(column_readers_.front().get()); + int64_t values_read; + values_to_read -= reader->ReadBatch( + values_to_read, def_levels.data(), nullptr, values.data(), &values_read); + if (descr_->max_definition_level() == 0) { + builder.Append(values.data(), values_read); + } else { + return Status::NotImplemented("no support for definition levels yet"); + } + if (!column_readers_.front()->HasNext()) { column_readers_.pop(); } + } + *out = builder.Finish(); + return Status::OK(); +} + +#define TYPED_BATCH_CASE(ENUM, ArrowType, ParquetType, CType) \ + case Type::ENUM: \ + return TypedReadBatch(batch_size, out); \ + break; + +Status FlatColumnReader::Impl::NextBatch(int batch_size, std::shared_ptr* out) { + if (column_readers_.size() == 0) { + // Exhausted all readers. + *out = std::shared_ptr(nullptr); + } + + if (descr_->max_repetition_level() > 0) { + return Status::NotImplemented("no support for repetition yet"); + } + + *out = std::shared_ptr(nullptr); + switch (field_->type->type) { + TYPED_BATCH_CASE(INT32, Int32Type, ::parquet::Int32Type, int32_t) + TYPED_BATCH_CASE(INT64, Int64Type, ::parquet::Int64Type, int64_t) + TYPED_BATCH_CASE(FLOAT, FloatType, ::parquet::FloatType, float) + TYPED_BATCH_CASE(DOUBLE, DoubleType, ::parquet::DoubleType, double) + default: + return Status::NotImplemented(field_->type->ToString()); + } +} + +FlatColumnReader::FlatColumnReader(std::unique_ptr impl) : impl_(std::move(impl)) {} + +FlatColumnReader::~FlatColumnReader() {} + +Status FlatColumnReader::NextBatch(int batch_size, std::shared_ptr* out) { + return impl_->NextBatch(batch_size, out); +} + +} // namespace parquet +} // namespace arrow diff --git a/cpp/src/arrow/parquet/reader.h b/cpp/src/arrow/parquet/reader.h index bc345662c8658..84eb6ef505e84 100644 --- a/cpp/src/arrow/parquet/reader.h +++ b/cpp/src/arrow/parquet/reader.h @@ -78,8 +78,7 @@ class FlatColumnReader; // arrays class FileReader { public: - ArrowReader(MemoryPool* pool, - std::unique_ptr<::parquet::ParquetFileReader> reader); + FileReader(MemoryPool* pool, std::unique_ptr<::parquet::ParquetFileReader> reader); // Since the distribution of columns amongst a Parquet file's row groups may // be uneven (the number of values in each column chunk can be different), we @@ -89,6 +88,10 @@ class FileReader { // // Returns error status if the column of interest is not flat. Status GetFlatColumn(int i, std::unique_ptr* out); + // Read column as a whole into an Array. + Status ReadFlatColumn(int i, std::shared_ptr* out); + + virtual ~FileReader(); private: class Impl; @@ -103,6 +106,8 @@ class FileReader { // might change in the future. class FlatColumnReader { public: + virtual ~FlatColumnReader(); + // Scan the next array of the indicated size. The actual size of the // returned array may be less than the passed size depending how much data is // available in the file. @@ -117,12 +122,13 @@ class FlatColumnReader { private: class Impl; std::unique_ptr impl_; + FlatColumnReader(std::unique_ptr impl); friend class FileReader; }; -} // namespace parquet +} // namespace parquet -} // namespace arrow +} // namespace arrow -#endif ARROW_PARQUET_READER_H +#endif // ARROW_PARQUET_READER_H diff --git a/cpp/src/arrow/parquet/schema.h b/cpp/src/arrow/parquet/schema.h index 5eedb715f2af2..ec5f96062e89f 100644 --- a/cpp/src/arrow/parquet/schema.h +++ b/cpp/src/arrow/parquet/schema.h @@ -45,4 +45,4 @@ Status ToParquetSchema( } // namespace arrow -#endif // ARROW_PARQUET_SCHEMA_H +#endif // ARROW_PARQUET_SCHEMA_H From 5fa1026fc7d870523112572a7cd3d7c98596597e Mon Sep 17 00:00:00 2001 From: "Uwe L. Korn" Date: Tue, 10 May 2016 23:33:53 +0200 Subject: [PATCH 3/5] Incorporate review comments --- cpp/src/arrow/parquet/parquet-reader-test.cc | 8 +- cpp/src/arrow/parquet/reader.cc | 87 ++++++++++++++------ cpp/src/arrow/parquet/schema.cc | 8 +- cpp/src/arrow/parquet/utils.h | 38 +++++++++ 4 files changed, 103 insertions(+), 38 deletions(-) create mode 100644 cpp/src/arrow/parquet/utils.h diff --git a/cpp/src/arrow/parquet/parquet-reader-test.cc b/cpp/src/arrow/parquet/parquet-reader-test.cc index bb0ad50caaaf7..489bbdeb55271 100644 --- a/cpp/src/arrow/parquet/parquet-reader-test.cc +++ b/cpp/src/arrow/parquet/parquet-reader-test.cc @@ -23,12 +23,8 @@ #include "arrow/util/memory-pool.h" #include "arrow/util/status.h" -#include "parquet/api/schema.h" -#include "parquet/column/writer.h" -#include "parquet/file/reader.h" -#include "parquet/file/writer.h" -#include "parquet/util/input.h" -#include "parquet/util/output.h" +#include "parquet/api/reader.h" +#include "parquet/api/writer.h" using ParquetBuffer = parquet::Buffer; using parquet::BufferReader; diff --git a/cpp/src/arrow/parquet/reader.cc b/cpp/src/arrow/parquet/reader.cc index 4c1b43f779638..481ded5789a71 100644 --- a/cpp/src/arrow/parquet/reader.cc +++ b/cpp/src/arrow/parquet/reader.cc @@ -20,6 +20,7 @@ #include #include "arrow/parquet/schema.h" +#include "arrow/parquet/utils.h" #include "arrow/schema.h" #include "arrow/types/primitive.h" #include "arrow/util/status.h" @@ -36,6 +37,7 @@ class FileReader::Impl { virtual ~Impl() {} Status GetFlatColumn(int i, std::unique_ptr* out); + Status ReadFlatColumn(int i, std::shared_ptr* out); private: MemoryPool* pool_; @@ -45,7 +47,7 @@ class FileReader::Impl { class FlatColumnReader::Impl { public: Impl(MemoryPool* pool, const ::parquet::ColumnDescriptor* descr, - std::queue>&& column_readers); + ::parquet::ParquetFileReader* reader, int column_index); virtual ~Impl() {} Status NextBatch(int batch_size, std::shared_ptr* out); @@ -53,10 +55,19 @@ class FlatColumnReader::Impl { Status TypedReadBatch(int batch_size, std::shared_ptr* out); private: + void NextRowGroup(); + MemoryPool* pool_; const ::parquet::ColumnDescriptor* descr_; - std::queue> column_readers_; + ::parquet::ParquetFileReader* reader_; + int column_index_; + int next_row_group_; + std::shared_ptr column_reader_; std::shared_ptr field_; + + PoolBuffer values_buffer_; + PoolBuffer def_levels_buffer_; + PoolBuffer rep_levels_buffer_; }; FileReader::Impl::Impl( @@ -64,16 +75,18 @@ FileReader::Impl::Impl( : pool_(pool), reader_(std::move(reader)) {} Status FileReader::Impl::GetFlatColumn(int i, std::unique_ptr* out) { - std::queue> column_readers; - for (int rg = 0; rg < reader_->num_row_groups(); rg++) { - column_readers.push(reader_->RowGroup(rg)->Column(i)); - } - std::unique_ptr impl(new FlatColumnReader::Impl( - pool_, reader_->descr()->Column(i), std::move(column_readers))); + std::unique_ptr impl( + new FlatColumnReader::Impl(pool_, reader_->descr()->Column(i), reader_.get(), i)); *out = std::unique_ptr(new FlatColumnReader(std::move(impl))); return Status::OK(); } +Status FileReader::Impl::ReadFlatColumn(int i, std::shared_ptr* out) { + std::unique_ptr flat_column_reader; + RETURN_NOT_OK(GetFlatColumn(i, &flat_column_reader)); + return flat_column_reader->NextBatch(reader_->num_rows(), out); +} + FileReader::FileReader( MemoryPool* pool, std::unique_ptr<::parquet::ParquetFileReader> reader) : impl_(new FileReader::Impl(pool, std::move(reader))) {} @@ -85,13 +98,21 @@ Status FileReader::GetFlatColumn(int i, std::unique_ptr* out) } Status FileReader::ReadFlatColumn(int i, std::shared_ptr* out) { - return Status::OK(); + return impl_->ReadFlatColumn(i, out); } FlatColumnReader::Impl::Impl(MemoryPool* pool, const ::parquet::ColumnDescriptor* descr, - std::queue>&& column_readers) - : pool_(pool), descr_(descr), column_readers_(column_readers) { + ::parquet::ParquetFileReader* reader, int column_index) + : pool_(pool), + descr_(descr), + reader_(reader), + column_index_(column_index), + next_row_group_(0), + values_buffer_(pool), + def_levels_buffer_(pool), + rep_levels_buffer_(pool) { NodeToField(descr_->schema_node(), &field_); + NextRowGroup(); } template @@ -99,21 +120,28 @@ Status FlatColumnReader::Impl::TypedReadBatch( int batch_size, std::shared_ptr* out) { int values_to_read = batch_size; NumericBuilder builder(pool_, field_->type); - while ((values_to_read > 0) && (column_readers_.size() > 0)) { - // TODO: This is a lot malloc-thresing and not using the memory pool. - std::vector values(values_to_read); - std::vector def_levels(values_to_read); - auto reader = - dynamic_cast*>(column_readers_.front().get()); + while ((values_to_read > 0) && column_reader_) { + values_buffer_.Resize(values_to_read * sizeof(CType)); + if (descr_->max_definition_level() > 0) { + def_levels_buffer_.Resize(values_to_read * sizeof(int16_t)); + } + if (descr_->max_repetition_level() > 0) { + rep_levels_buffer_.Resize(values_to_read * sizeof(int16_t)); + } + auto reader = dynamic_cast*>(column_reader_.get()); int64_t values_read; - values_to_read -= reader->ReadBatch( - values_to_read, def_levels.data(), nullptr, values.data(), &values_read); + CType* values = reinterpret_cast(values_buffer_.mutable_data()); + PARQUET_CATCH_NOT_OK( + values_to_read -= reader->ReadBatch(values_to_read, + reinterpret_cast(def_levels_buffer_.mutable_data()), + reinterpret_cast(rep_levels_buffer_.mutable_data()), values, + &values_read)); if (descr_->max_definition_level() == 0) { - builder.Append(values.data(), values_read); + RETURN_NOT_OK(builder.Append(values, values_read)); } else { return Status::NotImplemented("no support for definition levels yet"); } - if (!column_readers_.front()->HasNext()) { column_readers_.pop(); } + if (!column_reader_->HasNext()) { NextRowGroup(); } } *out = builder.Finish(); return Status::OK(); @@ -125,16 +153,16 @@ Status FlatColumnReader::Impl::TypedReadBatch( break; Status FlatColumnReader::Impl::NextBatch(int batch_size, std::shared_ptr* out) { - if (column_readers_.size() == 0) { - // Exhausted all readers. - *out = std::shared_ptr(nullptr); + if (!column_reader_) { + // Exhausted all row groups. + *out = nullptr; + return Status::OK(); } if (descr_->max_repetition_level() > 0) { return Status::NotImplemented("no support for repetition yet"); } - *out = std::shared_ptr(nullptr); switch (field_->type->type) { TYPED_BATCH_CASE(INT32, Int32Type, ::parquet::Int32Type, int32_t) TYPED_BATCH_CASE(INT64, Int64Type, ::parquet::Int64Type, int64_t) @@ -145,6 +173,15 @@ Status FlatColumnReader::Impl::NextBatch(int batch_size, std::shared_ptr* } } +void FlatColumnReader::Impl::NextRowGroup() { + if (next_row_group_ < reader_->num_row_groups()) { + column_reader_ = reader_->RowGroup(next_row_group_)->Column(column_index_); + next_row_group_++; + } else { + column_reader_ = nullptr; + } +} + FlatColumnReader::FlatColumnReader(std::unique_ptr impl) : impl_(std::move(impl)) {} FlatColumnReader::~FlatColumnReader() {} diff --git a/cpp/src/arrow/parquet/schema.cc b/cpp/src/arrow/parquet/schema.cc index 214c764f08b6e..fd758940c9f3a 100644 --- a/cpp/src/arrow/parquet/schema.cc +++ b/cpp/src/arrow/parquet/schema.cc @@ -21,13 +21,12 @@ #include #include "parquet/api/schema.h" -#include "parquet/exception.h" +#include "arrow/parquet/utils.h" #include "arrow/types/decimal.h" #include "arrow/types/string.h" #include "arrow/util/status.h" -using parquet::ParquetException; using parquet::Repetition; using parquet::schema::Node; using parquet::schema::NodePtr; @@ -41,11 +40,6 @@ namespace arrow { namespace parquet { -#define PARQUET_CATCH_NOT_OK(s) \ - try { \ - (s); \ - } catch (const ParquetException& e) { return Status::Invalid(e.what()); } - const auto BOOL = std::make_shared(); const auto UINT8 = std::make_shared(); const auto INT32 = std::make_shared(); diff --git a/cpp/src/arrow/parquet/utils.h b/cpp/src/arrow/parquet/utils.h new file mode 100644 index 0000000000000..b32792fdf7030 --- /dev/null +++ b/cpp/src/arrow/parquet/utils.h @@ -0,0 +1,38 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef ARROW_PARQUET_UTILS_H +#define ARROW_PARQUET_UTILS_H + +#include "arrow/util/status.h" + +#include "parquet/exception.h" + +namespace arrow { + +namespace parquet { + +#define PARQUET_CATCH_NOT_OK(s) \ + try { \ + (s); \ + } catch (const ::parquet::ParquetException& e) { return Status::Invalid(e.what()); } + +} // namespace parquet + +} // namespace arrow + +#endif // ARROW_PARQUET_UTILS_H From 47441a1d068df8c5a7e2c1ed91179841df4c1d8e Mon Sep 17 00:00:00 2001 From: "Uwe L. Korn" Date: Tue, 10 May 2016 23:39:22 +0200 Subject: [PATCH 4/5] Assert that no exception was thrown --- cpp/src/arrow/parquet/parquet-reader-test.cc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cpp/src/arrow/parquet/parquet-reader-test.cc b/cpp/src/arrow/parquet/parquet-reader-test.cc index 489bbdeb55271..a7fc2a89f5f45 100644 --- a/cpp/src/arrow/parquet/parquet-reader-test.cc +++ b/cpp/src/arrow/parquet/parquet-reader-test.cc @@ -84,7 +84,7 @@ TEST_F(TestReadParquet, SingleColumnInt64) { std::unique_ptr file_reader = Int64File(values, 1); arrow::parquet::FileReader reader(default_memory_pool(), std::move(file_reader)); std::unique_ptr column_reader; - ASSERT_OK(reader.GetFlatColumn(0, &column_reader)); + ASSERT_NO_THROW(ASSERT_OK(reader.GetFlatColumn(0, &column_reader))); ASSERT_NE(nullptr, column_reader.get()); std::shared_ptr out; ASSERT_OK(column_reader->NextBatch(100, &out)); @@ -100,7 +100,7 @@ TEST_F(TestReadParquet, SingleColumnInt64Chunked) { std::unique_ptr file_reader = Int64File(values, 4); arrow::parquet::FileReader reader(default_memory_pool(), std::move(file_reader)); std::unique_ptr column_reader; - ASSERT_OK(reader.GetFlatColumn(0, &column_reader)); + ASSERT_NO_THROW(ASSERT_OK(reader.GetFlatColumn(0, &column_reader))); ASSERT_NE(nullptr, column_reader.get()); std::shared_ptr out; ASSERT_OK(column_reader->NextBatch(100, &out)); From 7579fed4086b9d5d3ebf24e042b5be6bce5c601a Mon Sep 17 00:00:00 2001 From: "Uwe L. Korn" Date: Tue, 10 May 2016 23:41:32 +0200 Subject: [PATCH 5/5] Mark single argument constructor as explicit --- cpp/src/arrow/parquet/reader.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/src/arrow/parquet/reader.h b/cpp/src/arrow/parquet/reader.h index 84eb6ef505e84..41ca7eb35b9f0 100644 --- a/cpp/src/arrow/parquet/reader.h +++ b/cpp/src/arrow/parquet/reader.h @@ -122,7 +122,7 @@ class FlatColumnReader { private: class Impl; std::unique_ptr impl_; - FlatColumnReader(std::unique_ptr impl); + explicit FlatColumnReader(std::unique_ptr impl); friend class FileReader; };