Skip to content

Commit

Permalink
PARQUET-835: Read Arrow columns in parallel with thread pool
Browse files Browse the repository at this point in the history
Also implements PARQUET-836, but need to add a unit test for that

Author: Wes McKinney <wes.mckinney@twosigma.com>

Closes apache#222 from wesm/PARQUET-835 and squashes the following commits:

71c700e [Wes McKinney] Add missing include. Update Arrow version
638b4c0 [Wes McKinney] cpplint
7c79ca7 [Wes McKinney] Read Arrow columns in parallel with thread pool

Change-Id: I073f3fa2e49384f706597a343688db04e04843e4
  • Loading branch information
wesm committed Jan 23, 2017
1 parent c7825a3 commit 1eba579
Show file tree
Hide file tree
Showing 5 changed files with 163 additions and 11 deletions.
49 changes: 47 additions & 2 deletions cpp/src/parquet/arrow/arrow-reader-writer-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

#include "gtest/gtest.h"

#include <sstream>

#include "parquet/api/reader.h"
#include "parquet/api/writer.h"

Expand Down Expand Up @@ -44,7 +46,6 @@ using parquet::schema::NodePtr;
using parquet::schema::PrimitiveNode;

namespace parquet {

namespace arrow {

const int SMALL_SIZE = 100;
Expand Down Expand Up @@ -184,6 +185,23 @@ using ParquetDataType = DataType<test_traits<T>::parquet_enum>;
template <typename T>
using ParquetWriter = TypedColumnWriter<ParquetDataType<T>>;

void DoTableRoundtrip(
const std::shared_ptr<Table>& table, int num_threads, std::shared_ptr<Table>* out) {
auto sink = std::make_shared<InMemoryOutputStream>();

ASSERT_OK_NO_THROW(WriteFlatTable(
table.get(), ::arrow::default_memory_pool(), sink, (table->num_rows() + 1) / 2));

std::shared_ptr<Buffer> buffer = sink->GetBuffer();
std::unique_ptr<FileReader> reader;
ASSERT_OK_NO_THROW(
OpenFile(std::make_shared<BufferReader>(buffer), ::arrow::default_memory_pool(),
::parquet::default_reader_properties(), nullptr, &reader));

reader->set_num_threads(num_threads);
ASSERT_OK_NO_THROW(reader->ReadFlatTable(out));
}

template <typename TestType>
class TestParquetIO : public ::testing::Test {
public:
Expand Down Expand Up @@ -642,6 +660,33 @@ TYPED_TEST(TestPrimitiveParquetIO, SingleColumnRequiredChunkedTableRead) {
this->CheckSingleColumnRequiredTableRead(4);
}

} // namespace arrow
TEST(TestArrowReadWrite, MultithreadedRead) {
const int num_columns = 20;
const int num_rows = 1000;
const int num_threads = 4;

std::shared_ptr<::arrow::Column> column;
std::vector<std::shared_ptr<::arrow::Column>> columns(num_columns);
std::vector<std::shared_ptr<::arrow::Field>> fields(num_columns);

std::shared_ptr<Array> values;
for (int i = 0; i < num_columns; ++i) {
ASSERT_OK(NullableArray<::arrow::DoubleType>(num_rows, num_rows / 10, &values));
std::stringstream ss;
ss << "col" << i;
column = MakeColumn(ss.str(), values, true);

columns[i] = column;
fields[i] = column->field();
}
auto schema = std::make_shared<::arrow::Schema>(fields);
auto table = std::make_shared<Table>("schema", schema, columns);

std::shared_ptr<Table> result;
DoTableRoundtrip(table, num_threads, &result);

ASSERT_TRUE(table->Equals(result));
}

} // namespace arrow
} // namespace parquet
90 changes: 83 additions & 7 deletions cpp/src/parquet/arrow/reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,12 @@
#include "parquet/arrow/reader.h"

#include <algorithm>
#include <atomic>
#include <chrono>
#include <mutex>
#include <queue>
#include <string>
#include <thread>
#include <vector>

#include "parquet/arrow/schema.h"
Expand Down Expand Up @@ -65,11 +68,17 @@ class FileReader::Impl {
Status GetFlatColumn(int i, std::unique_ptr<FlatColumnReader>* out);
Status ReadFlatColumn(int i, std::shared_ptr<Array>* out);
Status ReadFlatTable(std::shared_ptr<Table>* out);
Status ReadFlatTable(
const std::vector<int>& column_indices, std::shared_ptr<Table>* out);
const ParquetFileReader* parquet_reader() const { return reader_.get(); }

void set_num_threads(int num_threads) { num_threads_ = num_threads; }

private:
MemoryPool* pool_;
std::unique_ptr<ParquetFileReader> reader_;

int num_threads_;
};

class FlatColumnReader::Impl {
Expand Down Expand Up @@ -125,7 +134,7 @@ class FlatColumnReader::Impl {
};

FileReader::Impl::Impl(MemoryPool* pool, std::unique_ptr<ParquetFileReader> reader)
: pool_(pool), reader_(std::move(reader)) {}
: pool_(pool), reader_(std::move(reader)), num_threads_(1) {}

bool FileReader::Impl::CheckForFlatColumn(const ColumnDescriptor* descr) {
if ((descr->max_repetition_level() > 0) || (descr->max_definition_level() > 1)) {
Expand Down Expand Up @@ -156,19 +165,73 @@ Status FileReader::Impl::ReadFlatColumn(int i, std::shared_ptr<Array>* out) {
}

Status FileReader::Impl::ReadFlatTable(std::shared_ptr<Table>* table) {
std::vector<int> column_indices(reader_->metadata()->num_columns());

for (size_t i = 0; i < column_indices.size(); ++i) {
column_indices[i] = i;
}
return ReadFlatTable(column_indices, table);
}

template <class FUNCTION>
Status ParallelFor(int nthreads, int num_tasks, FUNCTION&& func) {
std::vector<std::thread> thread_pool;
thread_pool.reserve(nthreads);
std::atomic<int> task_counter(0);

std::mutex error_mtx;
bool error_occurred = false;
Status error;

for (int thread_id = 0; thread_id < nthreads; ++thread_id) {
thread_pool.emplace_back(
[&num_tasks, &task_counter, &error, &error_occurred, &error_mtx, &func]() {
int task_id;
while (!error_occurred) {
task_id = task_counter.fetch_add(1);
if (task_id >= num_tasks) { break; }
Status s = func(task_id);
if (!s.ok()) {
std::lock_guard<std::mutex> lock(error_mtx);
error_occurred = true;
error = s;
break;
}
}
});
}
for (auto&& thread : thread_pool) {
thread.join();
}
if (error_occurred) { return error; }
return Status::OK();
}

Status FileReader::Impl::ReadFlatTable(
const std::vector<int>& indices, std::shared_ptr<Table>* table) {
auto descr = reader_->metadata()->schema();

const std::string& name = descr->name();
std::shared_ptr<::arrow::Schema> schema;
RETURN_NOT_OK(FromParquetSchema(descr, &schema));

int num_columns = reader_->metadata()->num_columns();
RETURN_NOT_OK(FromParquetSchema(descr, indices, &schema));

int num_columns = static_cast<int>(indices.size());
int nthreads = std::min<int>(num_threads_, num_columns);
std::vector<std::shared_ptr<Column>> columns(num_columns);
for (int i = 0; i < num_columns; i++) {

auto ReadColumn = [&indices, &schema, &columns, this](int i) {
std::shared_ptr<Array> array;
RETURN_NOT_OK(ReadFlatColumn(i, &array));
columns[i] = std::make_shared<Column>(schema->field(i), array);
RETURN_NOT_OK(ReadFlatColumn(indices[i], &array));
columns[i] = std::make_shared<Column>(schema->field(indices[i]), array);
return Status::OK();
};

if (nthreads == 1) {
for (int i = 0; i < num_columns; i++) {
RETURN_NOT_OK(ReadColumn(i));
}
} else {
RETURN_NOT_OK(ParallelFor(nthreads, num_columns, ReadColumn));
}

*table = std::make_shared<Table>(name, schema, columns);
Expand Down Expand Up @@ -218,6 +281,19 @@ Status FileReader::ReadFlatTable(std::shared_ptr<Table>* out) {
}
}

Status FileReader::ReadFlatTable(
const std::vector<int>& column_indices, std::shared_ptr<Table>* out) {
try {
return impl_->ReadFlatTable(column_indices, out);
} catch (const ::parquet::ParquetException& e) {
return ::arrow::Status::IOError(e.what());
}
}

void FileReader::set_num_threads(int num_threads) {
impl_->set_num_threads(num_threads);
}

const ParquetFileReader* FileReader::parquet_reader() const {
return impl_->parquet_reader();
}
Expand Down
12 changes: 12 additions & 0 deletions cpp/src/parquet/arrow/reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#define PARQUET_ARROW_READER_H

#include <memory>
#include <vector>

#include "parquet/api/reader.h"
#include "parquet/api/schema.h"
Expand Down Expand Up @@ -94,13 +95,24 @@ class PARQUET_EXPORT FileReader {
//
// Returns error status if the column of interest is not flat.
::arrow::Status GetFlatColumn(int i, std::unique_ptr<FlatColumnReader>* out);

// Read column as a whole into an Array.
::arrow::Status ReadFlatColumn(int i, std::shared_ptr<::arrow::Array>* out);

// Read a table of flat columns into a Table.
::arrow::Status ReadFlatTable(std::shared_ptr<::arrow::Table>* out);

// Read a table of flat columns into a Table. Read only the indicated column
// indices (relative to the schema)
::arrow::Status ReadFlatTable(
const std::vector<int>& column_indices, std::shared_ptr<::arrow::Table>* out);

const ParquetFileReader* parquet_reader() const;

/// Set the number of threads to use during reads of multiple columns. By
/// default only 1 thread is used
void set_num_threads(int num_threads);

virtual ~FileReader();

private:
Expand Down
19 changes: 17 additions & 2 deletions cpp/src/parquet/arrow/schema.cc
Original file line number Diff line number Diff line change
Expand Up @@ -250,8 +250,6 @@ Status NodeToField(const NodePtr& node, std::shared_ptr<Field>* out) {

Status FromParquetSchema(
const SchemaDescriptor* parquet_schema, std::shared_ptr<::arrow::Schema>* out) {
// TODO(wesm): Consider adding an arrow::Schema name attribute, which comes
// from the root Parquet node
const GroupNode* schema_node = parquet_schema->group_node();

std::vector<std::shared_ptr<Field>> fields(schema_node->field_count());
Expand All @@ -263,6 +261,23 @@ Status FromParquetSchema(
return Status::OK();
}

Status FromParquetSchema(const SchemaDescriptor* parquet_schema,
const std::vector<int>& column_indices, std::shared_ptr<::arrow::Schema>* out) {
// TODO(wesm): Consider adding an arrow::Schema name attribute, which comes
// from the root Parquet node
const GroupNode* schema_node = parquet_schema->group_node();

int num_fields = static_cast<int>(column_indices.size());

std::vector<std::shared_ptr<Field>> fields(num_fields);
for (int i = 0; i < num_fields; i++) {
RETURN_NOT_OK(NodeToField(schema_node->field(column_indices[i]), &fields[i]));
}

*out = std::make_shared<::arrow::Schema>(fields);
return Status::OK();
}

Status ListToNode(const std::shared_ptr<::arrow::ListType>& type, const std::string& name,
bool nullable, const WriterProperties& properties, NodePtr* out) {
Repetition::type repetition = nullable ? Repetition::OPTIONAL : Repetition::REQUIRED;
Expand Down
4 changes: 4 additions & 0 deletions cpp/src/parquet/arrow/schema.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#define PARQUET_ARROW_SCHEMA_H

#include <memory>
#include <vector>

#include "arrow/schema.h"
#include "arrow/type.h"
Expand All @@ -40,6 +41,9 @@ namespace arrow {
::arrow::Status PARQUET_EXPORT NodeToField(
const schema::NodePtr& node, std::shared_ptr<::arrow::Field>* out);

::arrow::Status PARQUET_EXPORT FromParquetSchema(const SchemaDescriptor* parquet_schema,
const std::vector<int>& column_indices, std::shared_ptr<::arrow::Schema>* out);

::arrow::Status PARQUET_EXPORT FromParquetSchema(
const SchemaDescriptor* parquet_schema, std::shared_ptr<::arrow::Schema>* out);

Expand Down

0 comments on commit 1eba579

Please sign in to comment.