diff --git a/cpp/src/parquet/arrow/arrow-reader-writer-test.cc b/cpp/src/parquet/arrow/arrow-reader-writer-test.cc index 57986de812dc0..6748a8dae067e 100644 --- a/cpp/src/parquet/arrow/arrow-reader-writer-test.cc +++ b/cpp/src/parquet/arrow/arrow-reader-writer-test.cc @@ -17,6 +17,8 @@ #include "gtest/gtest.h" +#include + #include "parquet/api/reader.h" #include "parquet/api/writer.h" @@ -44,7 +46,6 @@ using parquet::schema::NodePtr; using parquet::schema::PrimitiveNode; namespace parquet { - namespace arrow { const int SMALL_SIZE = 100; @@ -184,6 +185,23 @@ using ParquetDataType = DataType::parquet_enum>; template using ParquetWriter = TypedColumnWriter>; +void DoTableRoundtrip( + const std::shared_ptr& table, int num_threads, std::shared_ptr
* out) { + auto sink = std::make_shared(); + + ASSERT_OK_NO_THROW(WriteFlatTable( + table.get(), ::arrow::default_memory_pool(), sink, (table->num_rows() + 1) / 2)); + + std::shared_ptr buffer = sink->GetBuffer(); + std::unique_ptr reader; + ASSERT_OK_NO_THROW( + OpenFile(std::make_shared(buffer), ::arrow::default_memory_pool(), + ::parquet::default_reader_properties(), nullptr, &reader)); + + reader->set_num_threads(num_threads); + ASSERT_OK_NO_THROW(reader->ReadFlatTable(out)); +} + template class TestParquetIO : public ::testing::Test { public: @@ -642,6 +660,33 @@ TYPED_TEST(TestPrimitiveParquetIO, SingleColumnRequiredChunkedTableRead) { this->CheckSingleColumnRequiredTableRead(4); } -} // namespace arrow +TEST(TestArrowReadWrite, MultithreadedRead) { + const int num_columns = 20; + const int num_rows = 1000; + const int num_threads = 4; + + std::shared_ptr<::arrow::Column> column; + std::vector> columns(num_columns); + std::vector> fields(num_columns); + + std::shared_ptr values; + for (int i = 0; i < num_columns; ++i) { + ASSERT_OK(NullableArray<::arrow::DoubleType>(num_rows, num_rows / 10, &values)); + std::stringstream ss; + ss << "col" << i; + column = MakeColumn(ss.str(), values, true); + + columns[i] = column; + fields[i] = column->field(); + } + auto schema = std::make_shared<::arrow::Schema>(fields); + auto table = std::make_shared
("schema", schema, columns); + std::shared_ptr
result; + DoTableRoundtrip(table, num_threads, &result); + + ASSERT_TRUE(table->Equals(result)); +} + +} // namespace arrow } // namespace parquet diff --git a/cpp/src/parquet/arrow/reader.cc b/cpp/src/parquet/arrow/reader.cc index c9f986a0c1969..9221041c78592 100644 --- a/cpp/src/parquet/arrow/reader.cc +++ b/cpp/src/parquet/arrow/reader.cc @@ -18,9 +18,12 @@ #include "parquet/arrow/reader.h" #include +#include #include +#include #include #include +#include #include #include "parquet/arrow/schema.h" @@ -65,11 +68,17 @@ class FileReader::Impl { Status GetFlatColumn(int i, std::unique_ptr* out); Status ReadFlatColumn(int i, std::shared_ptr* out); Status ReadFlatTable(std::shared_ptr
* out); + Status ReadFlatTable( + const std::vector& column_indices, std::shared_ptr
* out); const ParquetFileReader* parquet_reader() const { return reader_.get(); } + void set_num_threads(int num_threads) { num_threads_ = num_threads; } + private: MemoryPool* pool_; std::unique_ptr reader_; + + int num_threads_; }; class FlatColumnReader::Impl { @@ -125,7 +134,7 @@ class FlatColumnReader::Impl { }; FileReader::Impl::Impl(MemoryPool* pool, std::unique_ptr reader) - : pool_(pool), reader_(std::move(reader)) {} + : pool_(pool), reader_(std::move(reader)), num_threads_(1) {} bool FileReader::Impl::CheckForFlatColumn(const ColumnDescriptor* descr) { if ((descr->max_repetition_level() > 0) || (descr->max_definition_level() > 1)) { @@ -156,19 +165,73 @@ Status FileReader::Impl::ReadFlatColumn(int i, std::shared_ptr* out) { } Status FileReader::Impl::ReadFlatTable(std::shared_ptr
* table) { + std::vector column_indices(reader_->metadata()->num_columns()); + + for (size_t i = 0; i < column_indices.size(); ++i) { + column_indices[i] = i; + } + return ReadFlatTable(column_indices, table); +} + +template +Status ParallelFor(int nthreads, int num_tasks, FUNCTION&& func) { + std::vector thread_pool; + thread_pool.reserve(nthreads); + std::atomic task_counter(0); + + std::mutex error_mtx; + bool error_occurred = false; + Status error; + + for (int thread_id = 0; thread_id < nthreads; ++thread_id) { + thread_pool.emplace_back( + [&num_tasks, &task_counter, &error, &error_occurred, &error_mtx, &func]() { + int task_id; + while (!error_occurred) { + task_id = task_counter.fetch_add(1); + if (task_id >= num_tasks) { break; } + Status s = func(task_id); + if (!s.ok()) { + std::lock_guard lock(error_mtx); + error_occurred = true; + error = s; + break; + } + } + }); + } + for (auto&& thread : thread_pool) { + thread.join(); + } + if (error_occurred) { return error; } + return Status::OK(); +} + +Status FileReader::Impl::ReadFlatTable( + const std::vector& indices, std::shared_ptr
* table) { auto descr = reader_->metadata()->schema(); const std::string& name = descr->name(); std::shared_ptr<::arrow::Schema> schema; - RETURN_NOT_OK(FromParquetSchema(descr, &schema)); - - int num_columns = reader_->metadata()->num_columns(); + RETURN_NOT_OK(FromParquetSchema(descr, indices, &schema)); + int num_columns = static_cast(indices.size()); + int nthreads = std::min(num_threads_, num_columns); std::vector> columns(num_columns); - for (int i = 0; i < num_columns; i++) { + + auto ReadColumn = [&indices, &schema, &columns, this](int i) { std::shared_ptr array; - RETURN_NOT_OK(ReadFlatColumn(i, &array)); - columns[i] = std::make_shared(schema->field(i), array); + RETURN_NOT_OK(ReadFlatColumn(indices[i], &array)); + columns[i] = std::make_shared(schema->field(indices[i]), array); + return Status::OK(); + }; + + if (nthreads == 1) { + for (int i = 0; i < num_columns; i++) { + RETURN_NOT_OK(ReadColumn(i)); + } + } else { + RETURN_NOT_OK(ParallelFor(nthreads, num_columns, ReadColumn)); } *table = std::make_shared
(name, schema, columns); @@ -218,6 +281,19 @@ Status FileReader::ReadFlatTable(std::shared_ptr
* out) { } } +Status FileReader::ReadFlatTable( + const std::vector& column_indices, std::shared_ptr
* out) { + try { + return impl_->ReadFlatTable(column_indices, out); + } catch (const ::parquet::ParquetException& e) { + return ::arrow::Status::IOError(e.what()); + } +} + +void FileReader::set_num_threads(int num_threads) { + impl_->set_num_threads(num_threads); +} + const ParquetFileReader* FileReader::parquet_reader() const { return impl_->parquet_reader(); } diff --git a/cpp/src/parquet/arrow/reader.h b/cpp/src/parquet/arrow/reader.h index 518ae4b3dd9d9..934b826d9ac79 100644 --- a/cpp/src/parquet/arrow/reader.h +++ b/cpp/src/parquet/arrow/reader.h @@ -19,6 +19,7 @@ #define PARQUET_ARROW_READER_H #include +#include #include "parquet/api/reader.h" #include "parquet/api/schema.h" @@ -94,13 +95,24 @@ class PARQUET_EXPORT FileReader { // // Returns error status if the column of interest is not flat. ::arrow::Status GetFlatColumn(int i, std::unique_ptr* out); + // Read column as a whole into an Array. ::arrow::Status ReadFlatColumn(int i, std::shared_ptr<::arrow::Array>* out); + // Read a table of flat columns into a Table. ::arrow::Status ReadFlatTable(std::shared_ptr<::arrow::Table>* out); + // Read a table of flat columns into a Table. Read only the indicated column + // indices (relative to the schema) + ::arrow::Status ReadFlatTable( + const std::vector& column_indices, std::shared_ptr<::arrow::Table>* out); + const ParquetFileReader* parquet_reader() const; + /// Set the number of threads to use during reads of multiple columns. By + /// default only 1 thread is used + void set_num_threads(int num_threads); + virtual ~FileReader(); private: diff --git a/cpp/src/parquet/arrow/schema.cc b/cpp/src/parquet/arrow/schema.cc index b086b9e6f42f2..4f17f5e86954b 100644 --- a/cpp/src/parquet/arrow/schema.cc +++ b/cpp/src/parquet/arrow/schema.cc @@ -250,8 +250,6 @@ Status NodeToField(const NodePtr& node, std::shared_ptr* out) { Status FromParquetSchema( const SchemaDescriptor* parquet_schema, std::shared_ptr<::arrow::Schema>* out) { - // TODO(wesm): Consider adding an arrow::Schema name attribute, which comes - // from the root Parquet node const GroupNode* schema_node = parquet_schema->group_node(); std::vector> fields(schema_node->field_count()); @@ -263,6 +261,23 @@ Status FromParquetSchema( return Status::OK(); } +Status FromParquetSchema(const SchemaDescriptor* parquet_schema, + const std::vector& column_indices, std::shared_ptr<::arrow::Schema>* out) { + // TODO(wesm): Consider adding an arrow::Schema name attribute, which comes + // from the root Parquet node + const GroupNode* schema_node = parquet_schema->group_node(); + + int num_fields = static_cast(column_indices.size()); + + std::vector> fields(num_fields); + for (int i = 0; i < num_fields; i++) { + RETURN_NOT_OK(NodeToField(schema_node->field(column_indices[i]), &fields[i])); + } + + *out = std::make_shared<::arrow::Schema>(fields); + return Status::OK(); +} + Status ListToNode(const std::shared_ptr<::arrow::ListType>& type, const std::string& name, bool nullable, const WriterProperties& properties, NodePtr* out) { Repetition::type repetition = nullable ? Repetition::OPTIONAL : Repetition::REQUIRED; diff --git a/cpp/src/parquet/arrow/schema.h b/cpp/src/parquet/arrow/schema.h index 6917b9051f81c..bb77a4e0ddcdf 100644 --- a/cpp/src/parquet/arrow/schema.h +++ b/cpp/src/parquet/arrow/schema.h @@ -19,6 +19,7 @@ #define PARQUET_ARROW_SCHEMA_H #include +#include #include "arrow/schema.h" #include "arrow/type.h" @@ -40,6 +41,9 @@ namespace arrow { ::arrow::Status PARQUET_EXPORT NodeToField( const schema::NodePtr& node, std::shared_ptr<::arrow::Field>* out); +::arrow::Status PARQUET_EXPORT FromParquetSchema(const SchemaDescriptor* parquet_schema, + const std::vector& column_indices, std::shared_ptr<::arrow::Schema>* out); + ::arrow::Status PARQUET_EXPORT FromParquetSchema( const SchemaDescriptor* parquet_schema, std::shared_ptr<::arrow::Schema>* out);