Skip to content

Commit

Permalink
ARROW-91: Basic Parquet read support
Browse files Browse the repository at this point in the history
  • Loading branch information
xhochy committed May 8, 2016
1 parent d9940d8 commit 8d2db22
Show file tree
Hide file tree
Showing 5 changed files with 266 additions and 8 deletions.
4 changes: 4 additions & 0 deletions cpp/src/arrow/parquet/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
# arrow_parquet : Arrow <-> Parquet adapter

set(PARQUET_SRCS
reader.cc
schema.cc
)

Expand All @@ -36,6 +37,9 @@ SET_TARGET_PROPERTIES(arrow_parquet PROPERTIES LINKER_LANGUAGE CXX)
ADD_ARROW_TEST(parquet-schema-test)
ARROW_TEST_LINK_LIBRARIES(parquet-schema-test arrow_parquet)

ADD_ARROW_TEST(parquet-reader-test)
ARROW_TEST_LINK_LIBRARIES(parquet-reader-test arrow_parquet)

# Headers: top level
install(FILES
DESTINATION include/arrow/parquet)
120 changes: 120 additions & 0 deletions cpp/src/arrow/parquet/parquet-reader-test.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include "gtest/gtest.h"

#include "arrow/test-util.h"
#include "arrow/parquet/reader.h"
#include "arrow/types/primitive.h"
#include "arrow/util/memory-pool.h"
#include "arrow/util/status.h"

#include "parquet/api/schema.h"
#include "parquet/column/writer.h"
#include "parquet/file/reader.h"
#include "parquet/file/writer.h"
#include "parquet/util/input.h"
#include "parquet/util/output.h"

using ParquetBuffer = parquet::Buffer;
using parquet::BufferReader;
using parquet::InMemoryOutputStream;
using parquet::Int64Writer;
using parquet::ParquetFileReader;
using parquet::ParquetFileWriter;
using parquet::RandomAccessSource;
using parquet::Repetition;
using parquet::SchemaDescriptor;
using ParquetType = parquet::Type;
using parquet::schema::GroupNode;
using parquet::schema::NodePtr;
using parquet::schema::PrimitiveNode;

namespace arrow {

namespace parquet {

class TestReadParquet : public ::testing::Test {
public:
virtual void SetUp() {}

std::shared_ptr<GroupNode> Int64Schema() {
auto pnode = PrimitiveNode::Make("int64", Repetition::REQUIRED, ParquetType::INT64);
NodePtr node_ =
GroupNode::Make("schema", Repetition::REQUIRED, std::vector<NodePtr>({pnode}));
return std::static_pointer_cast<GroupNode>(node_);
}

std::unique_ptr<ParquetFileReader> Int64File(
std::vector<int64_t>& values, int num_chunks) {
std::shared_ptr<GroupNode> schema = Int64Schema();
std::shared_ptr<InMemoryOutputStream> sink(new InMemoryOutputStream());
auto file_writer = ParquetFileWriter::Open(sink, schema);
size_t chunk_size = values.size() / num_chunks;
for (int i = 0; i < num_chunks; i++) {
auto row_group_writer = file_writer->AppendRowGroup(chunk_size);
auto column_writer = static_cast<Int64Writer*>(row_group_writer->NextColumn());
int64_t* data = values.data() + i * chunk_size;
column_writer->WriteBatch(chunk_size, nullptr, nullptr, data);
column_writer->Close();
row_group_writer->Close();
}
file_writer->Close();

std::shared_ptr<ParquetBuffer> buffer = sink->GetBuffer();
std::unique_ptr<RandomAccessSource> source(new BufferReader(buffer));
return ParquetFileReader::Open(std::move(source));
}

private:
};

TEST_F(TestReadParquet, SingleColumnInt64) {
std::vector<int64_t> values(100, 128);
std::unique_ptr<ParquetFileReader> file_reader = Int64File(values, 1);
arrow::parquet::FileReader reader(default_memory_pool(), std::move(file_reader));
std::unique_ptr<arrow::parquet::FlatColumnReader> column_reader;
ASSERT_OK(reader.GetFlatColumn(0, &column_reader));
ASSERT_NE(nullptr, column_reader.get());
std::shared_ptr<Array> out;
ASSERT_OK(column_reader->NextBatch(100, &out));
ASSERT_NE(nullptr, out.get());
Int64Array* out_array = static_cast<Int64Array*>(out.get());
for (size_t i = 0; i < values.size(); i++) {
EXPECT_EQ(values[i], out_array->raw_data()[i]);
}
}

TEST_F(TestReadParquet, SingleColumnInt64Chunked) {
std::vector<int64_t> values(100, 128);
std::unique_ptr<ParquetFileReader> file_reader = Int64File(values, 4);
arrow::parquet::FileReader reader(default_memory_pool(), std::move(file_reader));
std::unique_ptr<arrow::parquet::FlatColumnReader> column_reader;
ASSERT_OK(reader.GetFlatColumn(0, &column_reader));
ASSERT_NE(nullptr, column_reader.get());
std::shared_ptr<Array> out;
ASSERT_OK(column_reader->NextBatch(100, &out));
ASSERT_NE(nullptr, out.get());
Int64Array* out_array = static_cast<Int64Array*>(out.get());
for (size_t i = 0; i < values.size(); i++) {
EXPECT_EQ(values[i], out_array->raw_data()[i]);
}
}

} // namespace parquet

} // namespace arrow
132 changes: 130 additions & 2 deletions cpp/src/arrow/parquet/reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,141 @@

#include "arrow/parquet/reader.h"

#include <queue>

#include "arrow/parquet/schema.h"
#include "arrow/schema.h"
#include "arrow/types/primitive.h"
#include "arrow/util/status.h"

using parquet::ColumnReader;
using parquet::TypedColumnReader;

namespace arrow {
namespace parquet {

class FileReader::Impl {
public:
Impl(MemoryPool* pool, std::unique_ptr<::parquet::ParquetFileReader> reader);
virtual ~Impl() {}

Status GetFlatColumn(int i, std::unique_ptr<FlatColumnReader>* out);

private:
MemoryPool* pool_;
std::unique_ptr<::parquet::ParquetFileReader> reader_;
};

} // namespace parquet
} // namespace arrow
class FlatColumnReader::Impl {
public:
Impl(MemoryPool* pool, const ::parquet::ColumnDescriptor* descr,
std::queue<std::shared_ptr<ColumnReader>>&& column_readers);
virtual ~Impl() {}

Status NextBatch(int batch_size, std::shared_ptr<Array>* out);
template <typename ArrowType, typename ParquetType, typename CType>
Status TypedReadBatch(int batch_size, std::shared_ptr<Array>* out);

private:
MemoryPool* pool_;
const ::parquet::ColumnDescriptor* descr_;
std::queue<std::shared_ptr<ColumnReader>> column_readers_;
std::shared_ptr<Field> field_;
};

FileReader::Impl::Impl(
MemoryPool* pool, std::unique_ptr<::parquet::ParquetFileReader> reader)
: pool_(pool), reader_(std::move(reader)) {}

Status FileReader::Impl::GetFlatColumn(int i, std::unique_ptr<FlatColumnReader>* out) {
std::queue<std::shared_ptr<ColumnReader>> column_readers;
for (int rg = 0; rg < reader_->num_row_groups(); rg++) {
column_readers.push(reader_->RowGroup(rg)->Column(i));
}
std::unique_ptr<FlatColumnReader::Impl> impl(new FlatColumnReader::Impl(
pool_, reader_->descr()->Column(i), std::move(column_readers)));
*out = std::unique_ptr<FlatColumnReader>(new FlatColumnReader(std::move(impl)));
return Status::OK();
}

FileReader::FileReader(
MemoryPool* pool, std::unique_ptr<::parquet::ParquetFileReader> reader)
: impl_(new FileReader::Impl(pool, std::move(reader))) {}

FileReader::~FileReader() {}

Status FileReader::GetFlatColumn(int i, std::unique_ptr<FlatColumnReader>* out) {
return impl_->GetFlatColumn(i, out);
}

Status FileReader::ReadFlatColumn(int i, std::shared_ptr<Array>* out) {
return Status::OK();
}

FlatColumnReader::Impl::Impl(MemoryPool* pool, const ::parquet::ColumnDescriptor* descr,
std::queue<std::shared_ptr<ColumnReader>>&& column_readers)
: pool_(pool), descr_(descr), column_readers_(column_readers) {
NodeToField(descr_->schema_node(), &field_);
}

template <typename ArrowType, typename ParquetType, typename CType>
Status FlatColumnReader::Impl::TypedReadBatch(
int batch_size, std::shared_ptr<Array>* out) {
int values_to_read = batch_size;
NumericBuilder<ArrowType> builder(pool_, field_->type);
while ((values_to_read > 0) && (column_readers_.size() > 0)) {
// TODO: This is a lot malloc-thresing and not using the memory pool.
std::vector<CType> values(values_to_read);
std::vector<int16_t> def_levels(values_to_read);
auto reader =
dynamic_cast<TypedColumnReader<ParquetType>*>(column_readers_.front().get());
int64_t values_read;
values_to_read -= reader->ReadBatch(
values_to_read, def_levels.data(), nullptr, values.data(), &values_read);
if (descr_->max_definition_level() == 0) {
builder.Append(values.data(), values_read);
} else {
return Status::NotImplemented("no support for definition levels yet");
}
if (!column_readers_.front()->HasNext()) { column_readers_.pop(); }
}
*out = builder.Finish();
return Status::OK();
}

#define TYPED_BATCH_CASE(ENUM, ArrowType, ParquetType, CType) \
case Type::ENUM: \
return TypedReadBatch<ArrowType, ParquetType, CType>(batch_size, out); \
break;

Status FlatColumnReader::Impl::NextBatch(int batch_size, std::shared_ptr<Array>* out) {
if (column_readers_.size() == 0) {
// Exhausted all readers.
*out = std::shared_ptr<Array>(nullptr);
}

if (descr_->max_repetition_level() > 0) {
return Status::NotImplemented("no support for repetition yet");
}

*out = std::shared_ptr<Array>(nullptr);
switch (field_->type->type) {
TYPED_BATCH_CASE(INT32, Int32Type, ::parquet::Int32Type, int32_t)
TYPED_BATCH_CASE(INT64, Int64Type, ::parquet::Int64Type, int64_t)
TYPED_BATCH_CASE(FLOAT, FloatType, ::parquet::FloatType, float)
TYPED_BATCH_CASE(DOUBLE, DoubleType, ::parquet::DoubleType, double)
default:
return Status::NotImplemented(field_->type->ToString());
}
}

FlatColumnReader::FlatColumnReader(std::unique_ptr<Impl> impl) : impl_(std::move(impl)) {}

FlatColumnReader::~FlatColumnReader() {}

Status FlatColumnReader::NextBatch(int batch_size, std::shared_ptr<Array>* out) {
return impl_->NextBatch(batch_size, out);
}

} // namespace parquet
} // namespace arrow
16 changes: 11 additions & 5 deletions cpp/src/arrow/parquet/reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,7 @@ class FlatColumnReader;
// arrays
class FileReader {
public:
ArrowReader(MemoryPool* pool,
std::unique_ptr<::parquet::ParquetFileReader> reader);
FileReader(MemoryPool* pool, std::unique_ptr<::parquet::ParquetFileReader> reader);

// Since the distribution of columns amongst a Parquet file's row groups may
// be uneven (the number of values in each column chunk can be different), we
Expand All @@ -89,6 +88,10 @@ class FileReader {
//
// Returns error status if the column of interest is not flat.
Status GetFlatColumn(int i, std::unique_ptr<FlatColumnReader>* out);
// Read column as a whole into an Array.
Status ReadFlatColumn(int i, std::shared_ptr<Array>* out);

virtual ~FileReader();

private:
class Impl;
Expand All @@ -103,6 +106,8 @@ class FileReader {
// might change in the future.
class FlatColumnReader {
public:
virtual ~FlatColumnReader();

// Scan the next array of the indicated size. The actual size of the
// returned array may be less than the passed size depending how much data is
// available in the file.
Expand All @@ -117,12 +122,13 @@ class FlatColumnReader {
private:
class Impl;
std::unique_ptr<Impl> impl_;
FlatColumnReader(std::unique_ptr<Impl> impl);

friend class FileReader;
};

} // namespace parquet
} // namespace parquet

} // namespace arrow
} // namespace arrow

#endif ARROW_PARQUET_READER_H
#endif // ARROW_PARQUET_READER_H
2 changes: 1 addition & 1 deletion cpp/src/arrow/parquet/schema.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,4 +45,4 @@ Status ToParquetSchema(

} // namespace arrow

#endif // ARROW_PARQUET_SCHEMA_H
#endif // ARROW_PARQUET_SCHEMA_H

0 comments on commit 8d2db22

Please sign in to comment.