diff --git a/ci/travis_before_script_cpp.sh b/ci/travis_before_script_cpp.sh index 193c76feba1..6159f67e361 100755 --- a/ci/travis_before_script_cpp.sh +++ b/ci/travis_before_script_cpp.sh @@ -2,6 +2,10 @@ set -e +source $TRAVIS_BUILD_DIR/ci/travis_install_conda.sh +conda install -y --channel apache/channel/dev parquet-cpp +export PARQUET_HOME=$MINICONDA + : ${CPP_BUILD_DIR=$TRAVIS_BUILD_DIR/cpp-build} mkdir $CPP_BUILD_DIR @@ -19,7 +23,7 @@ echo $GTEST_HOME : ${ARROW_CPP_INSTALL=$TRAVIS_BUILD_DIR/cpp-install} -CMAKE_COMMON_FLAGS="-DARROW_BUILD_BENCHMARKS=ON -DCMAKE_INSTALL_PREFIX=$ARROW_CPP_INSTALL" +CMAKE_COMMON_FLAGS="-DARROW_BUILD_BENCHMARKS=ON -DARROW_PARQUET=ON -DCMAKE_INSTALL_PREFIX=$ARROW_CPP_INSTALL" if [ $TRAVIS_OS_NAME == "linux" ]; then cmake -DARROW_TEST_MEMCHECK=on $CMAKE_COMMON_FLAGS -DCMAKE_CXX_FLAGS="-Werror" $CPP_DIR diff --git a/ci/travis_conda_build.sh b/ci/travis_conda_build.sh index afa531dbd6b..c43a85170b0 100755 --- a/ci/travis_conda_build.sh +++ b/ci/travis_conda_build.sh @@ -2,27 +2,7 @@ set -e -if [ $TRAVIS_OS_NAME == "linux" ]; then - MINICONDA_URL="https://repo.continuum.io/miniconda/Miniconda-latest-Linux-x86_64.sh" -else - MINICONDA_URL="https://repo.continuum.io/miniconda/Miniconda-latest-MacOSX-x86_64.sh" -fi - -wget -O miniconda.sh $MINICONDA_URL -MINICONDA=$TRAVIS_BUILD_DIR/miniconda -bash miniconda.sh -b -p $MINICONDA -export PATH="$MINICONDA/bin:$PATH" -conda update -y -q conda -conda info -a - -conda config --set show_channel_urls yes -conda config --add channels conda-forge -conda config --add channels apache - -conda install --yes conda-build jinja2 anaconda-client - -# faster builds, please -conda install -y nomkl +source $TRAVIS_BUILD_DIR/ci/travis_install_conda.sh # Build libarrow diff --git a/ci/travis_install_conda.sh b/ci/travis_install_conda.sh new file mode 100644 index 00000000000..bef667dff7c --- /dev/null +++ b/ci/travis_install_conda.sh @@ -0,0 +1,26 @@ +#!/usr/bin/env bash + +set -e + +if [ $TRAVIS_OS_NAME == "linux" ]; then + MINICONDA_URL="https://repo.continuum.io/miniconda/Miniconda-latest-Linux-x86_64.sh" +else + MINICONDA_URL="https://repo.continuum.io/miniconda/Miniconda-latest-MacOSX-x86_64.sh" +fi + +wget -O miniconda.sh $MINICONDA_URL +export MINICONDA=$TRAVIS_BUILD_DIR/miniconda +bash miniconda.sh -b -p $MINICONDA +export PATH="$MINICONDA/bin:$PATH" +conda update -y -q conda +conda info -a + +conda config --set show_channel_urls yes +conda config --add channels conda-forge +conda config --add channels apache + +conda install --yes conda-build jinja2 anaconda-client + +# faster builds, please +conda install -y nomkl + diff --git a/ci/travis_script_python.sh b/ci/travis_script_python.sh index d45b895d8cf..6d35785356a 100755 --- a/ci/travis_script_python.sh +++ b/ci/travis_script_python.sh @@ -4,6 +4,12 @@ set -e PYTHON_DIR=$TRAVIS_BUILD_DIR/python +# Re-use conda installation from C++ +export MINICONDA=$TRAVIS_BUILD_DIR/miniconda +export PATH="$MINICONDA/bin:$PATH" +export LD_LIBRARY_PATH="$MINICONDA/lib:$LD_LIBRARY_PATH" +export PARQUET_HOME=$MINICONDA + # Share environment with C++ pushd $CPP_BUILD_DIR source setup_build_env.sh @@ -11,21 +17,6 @@ popd pushd $PYTHON_DIR -# Bootstrap a Conda Python environment - -if [ $TRAVIS_OS_NAME == "linux" ]; then - MINICONDA_URL="https://repo.continuum.io/miniconda/Miniconda-latest-Linux-x86_64.sh" -else - MINICONDA_URL="https://repo.continuum.io/miniconda/Miniconda-latest-MacOSX-x86_64.sh" -fi - -curl $MINICONDA_URL > miniconda.sh -MINICONDA=$TRAVIS_BUILD_DIR/miniconda -bash miniconda.sh -b -p $MINICONDA -export PATH="$MINICONDA/bin:$PATH" -conda update -y -q conda -conda info -a - python_version_tests() { PYTHON_VERSION=$1 CONDA_ENV_NAME="pyarrow-test-${PYTHON_VERSION}" diff --git a/cpp/src/arrow/column.h b/cpp/src/arrow/column.h index 22becc34547..e409566e1f1 100644 --- a/cpp/src/arrow/column.h +++ b/cpp/src/arrow/column.h @@ -67,6 +67,8 @@ class Column { int64_t null_count() const { return data_->null_count(); } + const std::shared_ptr& field() const { return field_; } + // @returns: the column's name in the passed metadata const std::string& name() const { return field_->name; } diff --git a/cpp/src/arrow/parquet/CMakeLists.txt b/cpp/src/arrow/parquet/CMakeLists.txt index c00cc9f0f25..f00bb53c084 100644 --- a/cpp/src/arrow/parquet/CMakeLists.txt +++ b/cpp/src/arrow/parquet/CMakeLists.txt @@ -35,6 +35,13 @@ add_library(arrow_parquet SHARED target_link_libraries(arrow_parquet ${PARQUET_LIBS}) SET_TARGET_PROPERTIES(arrow_parquet PROPERTIES LINKER_LANGUAGE CXX) +if (APPLE) + set_target_properties(arrow_parquet + PROPERTIES + BUILD_WITH_INSTALL_RPATH ON + INSTALL_NAME_DIR "@rpath") +endif() + ADD_ARROW_TEST(parquet-schema-test) ARROW_TEST_LINK_LIBRARIES(parquet-schema-test arrow_parquet) diff --git a/cpp/src/arrow/parquet/parquet-io-test.cc b/cpp/src/arrow/parquet/parquet-io-test.cc index 845574d2c53..db779d8309c 100644 --- a/cpp/src/arrow/parquet/parquet-io-test.cc +++ b/cpp/src/arrow/parquet/parquet-io-test.cc @@ -18,6 +18,7 @@ #include "gtest/gtest.h" #include "arrow/test-util.h" +#include "arrow/parquet/test-util.h" #include "arrow/parquet/reader.h" #include "arrow/parquet/writer.h" #include "arrow/types/primitive.h" @@ -44,36 +45,45 @@ namespace arrow { namespace parquet { -template -std::shared_ptr NonNullArray( - size_t size, typename ArrowType::c_type value) { - std::vector values(size, value); - NumericBuilder builder(default_memory_pool(), std::make_shared()); - builder.Append(values.data(), values.size()); - return std::static_pointer_cast(builder.Finish()); -} +const int SMALL_SIZE = 100; +const int LARGE_SIZE = 10000; -// This helper function only supports (size/2) nulls yet. -template -std::shared_ptr NullableArray( - size_t size, typename ArrowType::c_type value, size_t num_nulls) { - std::vector values(size, value); - std::vector valid_bytes(size, 1); +template +struct test_traits {}; - for (size_t i = 0; i < num_nulls; i++) { - valid_bytes[i * 2] = 0; - } +template <> +struct test_traits { + static constexpr ParquetType::type parquet_enum = ParquetType::INT32; +}; - NumericBuilder builder(default_memory_pool(), std::make_shared()); - builder.Append(values.data(), values.size(), valid_bytes.data()); - return std::static_pointer_cast(builder.Finish()); -} +template <> +struct test_traits { + static constexpr ParquetType::type parquet_enum = ParquetType::INT64; +}; + +template <> +struct test_traits { + static constexpr ParquetType::type parquet_enum = ParquetType::FLOAT; +}; + +template <> +struct test_traits { + static constexpr ParquetType::type parquet_enum = ParquetType::DOUBLE; +}; + +template +using ParquetDataType = ::parquet::DataType::parquet_enum>; +template +using ParquetWriter = ::parquet::TypedColumnWriter>; + +template class TestParquetIO : public ::testing::Test { public: + typedef typename TestType::c_type T; virtual void SetUp() {} - std::shared_ptr Schema( + std::shared_ptr MakeSchema( ParquetType::type parquet_type, Repetition::type repetition) { auto pnode = PrimitiveNode::Make("column1", repetition, parquet_type); NodePtr node_ = @@ -98,20 +108,27 @@ class TestParquetIO : public ::testing::Test { std::unique_ptr column_reader; ASSERT_NO_THROW(ASSERT_OK(reader.GetFlatColumn(0, &column_reader))); ASSERT_NE(nullptr, column_reader.get()); - ASSERT_OK(column_reader->NextBatch(100, out)); + ASSERT_OK(column_reader->NextBatch(SMALL_SIZE, out)); + ASSERT_NE(nullptr, out->get()); + } + + void ReadTableFromFile( + std::unique_ptr file_reader, std::shared_ptr* out) { + arrow::parquet::FileReader reader(default_memory_pool(), std::move(file_reader)); + ASSERT_NO_THROW(ASSERT_OK(reader.ReadFlatTable(out))); ASSERT_NE(nullptr, out->get()); } - std::unique_ptr Int64File( - std::vector& values, int num_chunks) { - std::shared_ptr schema = Schema(ParquetType::INT64, Repetition::REQUIRED); + std::unique_ptr TestFile(std::vector& values, int num_chunks) { + std::shared_ptr schema = + MakeSchema(test_traits::parquet_enum, Repetition::REQUIRED); std::unique_ptr file_writer = MakeWriter(schema); size_t chunk_size = values.size() / num_chunks; for (int i = 0; i < num_chunks; i++) { auto row_group_writer = file_writer->AppendRowGroup(chunk_size); - auto column_writer = - static_cast<::parquet::Int64Writer*>(row_group_writer->NextColumn()); - int64_t* data = values.data() + i * chunk_size; + auto column_writer = static_cast*>( + row_group_writer->NextColumn()); + T* data = values.data() + i * chunk_size; column_writer->WriteBatch(chunk_size, nullptr, nullptr, data); column_writer->Close(); row_group_writer->Close(); @@ -120,71 +137,135 @@ class TestParquetIO : public ::testing::Test { return ReaderFromSink(); } - private: std::shared_ptr sink_; }; -TEST_F(TestParquetIO, SingleColumnInt64Read) { - std::vector values(100, 128); - std::unique_ptr file_reader = Int64File(values, 1); +typedef ::testing::Types TestTypes; + +TYPED_TEST_CASE(TestParquetIO, TestTypes); + +TYPED_TEST(TestParquetIO, SingleColumnRequiredRead) { + std::vector values(SMALL_SIZE, 128); + std::unique_ptr file_reader = this->TestFile(values, 1); std::shared_ptr out; - ReadSingleColumnFile(std::move(file_reader), &out); + this->ReadSingleColumnFile(std::move(file_reader), &out); - Int64Array* out_array = static_cast(out.get()); - for (size_t i = 0; i < values.size(); i++) { - EXPECT_EQ(values[i], out_array->raw_data()[i]); - } + ExpectArray(values.data(), out.get()); } -TEST_F(TestParquetIO, SingleColumnInt64ChunkedRead) { - std::vector values(100, 128); - std::unique_ptr file_reader = Int64File(values, 4); +TYPED_TEST(TestParquetIO, SingleColumnRequiredTableRead) { + std::vector values(SMALL_SIZE, 128); + std::unique_ptr file_reader = this->TestFile(values, 1); + + std::shared_ptr
out; + this->ReadTableFromFile(std::move(file_reader), &out); + ASSERT_EQ(1, out->num_columns()); + ASSERT_EQ(SMALL_SIZE, out->num_rows()); + + std::shared_ptr chunked_array = out->column(0)->data(); + ASSERT_EQ(1, chunked_array->num_chunks()); + ExpectArray(values.data(), chunked_array->chunk(0).get()); +} + +TYPED_TEST(TestParquetIO, SingleColumnRequiredChunkedRead) { + std::vector values(SMALL_SIZE, 128); + std::unique_ptr file_reader = this->TestFile(values, 4); std::shared_ptr out; - ReadSingleColumnFile(std::move(file_reader), &out); + this->ReadSingleColumnFile(std::move(file_reader), &out); - Int64Array* out_array = static_cast(out.get()); - for (size_t i = 0; i < values.size(); i++) { - EXPECT_EQ(values[i], out_array->raw_data()[i]); - } + ExpectArray(values.data(), out.get()); } -TEST_F(TestParquetIO, SingleColumnInt64Write) { - std::shared_ptr values = NonNullArray(100, 128); +TYPED_TEST(TestParquetIO, SingleColumnRequiredChunkedTableRead) { + std::vector values(SMALL_SIZE, 128); + std::unique_ptr file_reader = this->TestFile(values, 4); + + std::shared_ptr
out; + this->ReadTableFromFile(std::move(file_reader), &out); + ASSERT_EQ(1, out->num_columns()); + ASSERT_EQ(SMALL_SIZE, out->num_rows()); - std::shared_ptr schema = Schema(ParquetType::INT64, Repetition::REQUIRED); - FileWriter writer(default_memory_pool(), MakeWriter(schema)); + std::shared_ptr chunked_array = out->column(0)->data(); + ASSERT_EQ(1, chunked_array->num_chunks()); + ExpectArray(values.data(), chunked_array->chunk(0).get()); +} + +TYPED_TEST(TestParquetIO, SingleColumnRequiredWrite) { + std::shared_ptr values = NonNullArray(SMALL_SIZE, 128); + + std::shared_ptr schema = + this->MakeSchema(test_traits::parquet_enum, Repetition::REQUIRED); + FileWriter writer(default_memory_pool(), this->MakeWriter(schema)); ASSERT_NO_THROW(ASSERT_OK(writer.NewRowGroup(values->length()))); ASSERT_NO_THROW(ASSERT_OK(writer.WriteFlatColumnChunk(values.get()))); ASSERT_NO_THROW(ASSERT_OK(writer.Close())); std::shared_ptr out; - ReadSingleColumnFile(ReaderFromSink(), &out); + this->ReadSingleColumnFile(this->ReaderFromSink(), &out); ASSERT_TRUE(values->Equals(out)); } -TEST_F(TestParquetIO, SingleColumnDoubleReadWrite) { +TYPED_TEST(TestParquetIO, SingleColumnTableRequiredWrite) { + std::shared_ptr values = NonNullArray(SMALL_SIZE, 128); + std::shared_ptr
table = MakeSimpleTable(values, false); + this->sink_ = std::make_shared(); + ASSERT_NO_THROW(ASSERT_OK( + WriteFlatTable(table.get(), default_memory_pool(), this->sink_, values->length()))); + + std::shared_ptr
out; + this->ReadTableFromFile(this->ReaderFromSink(), &out); + ASSERT_EQ(1, out->num_columns()); + ASSERT_EQ(100, out->num_rows()); + + std::shared_ptr chunked_array = out->column(0)->data(); + ASSERT_EQ(1, chunked_array->num_chunks()); + ASSERT_TRUE(values->Equals(chunked_array->chunk(0))); +} + +TYPED_TEST(TestParquetIO, SingleColumnOptionalReadWrite) { // This also tests max_definition_level = 1 - std::shared_ptr values = NullableArray(100, 128, 10); + std::shared_ptr values = NullableArray(SMALL_SIZE, 128, 10); - std::shared_ptr schema = Schema(ParquetType::DOUBLE, Repetition::OPTIONAL); - FileWriter writer(default_memory_pool(), MakeWriter(schema)); + std::shared_ptr schema = + this->MakeSchema(test_traits::parquet_enum, Repetition::OPTIONAL); + FileWriter writer(default_memory_pool(), this->MakeWriter(schema)); ASSERT_NO_THROW(ASSERT_OK(writer.NewRowGroup(values->length()))); ASSERT_NO_THROW(ASSERT_OK(writer.WriteFlatColumnChunk(values.get()))); ASSERT_NO_THROW(ASSERT_OK(writer.Close())); std::shared_ptr out; - ReadSingleColumnFile(ReaderFromSink(), &out); + this->ReadSingleColumnFile(this->ReaderFromSink(), &out); ASSERT_TRUE(values->Equals(out)); } -TEST_F(TestParquetIO, SingleColumnInt64ChunkedWrite) { - std::shared_ptr values = NonNullArray(100, 128); - std::shared_ptr values_chunk = NonNullArray(25, 128); +TYPED_TEST(TestParquetIO, SingleColumnTableOptionalReadWrite) { + // This also tests max_definition_level = 1 + std::shared_ptr values = NullableArray(SMALL_SIZE, 128, 10); + std::shared_ptr
table = MakeSimpleTable(values, true); + this->sink_ = std::make_shared(); + ASSERT_NO_THROW(ASSERT_OK( + WriteFlatTable(table.get(), default_memory_pool(), this->sink_, values->length()))); + + std::shared_ptr
out; + this->ReadTableFromFile(this->ReaderFromSink(), &out); + ASSERT_EQ(1, out->num_columns()); + ASSERT_EQ(SMALL_SIZE, out->num_rows()); + + std::shared_ptr chunked_array = out->column(0)->data(); + ASSERT_EQ(1, chunked_array->num_chunks()); + ASSERT_TRUE(values->Equals(chunked_array->chunk(0))); +} - std::shared_ptr schema = Schema(ParquetType::INT64, Repetition::REQUIRED); - FileWriter writer(default_memory_pool(), MakeWriter(schema)); +TYPED_TEST(TestParquetIO, SingleColumnIntRequiredChunkedWrite) { + std::shared_ptr values = NonNullArray(SMALL_SIZE, 128); + std::shared_ptr values_chunk = + NonNullArray(SMALL_SIZE / 4, 128); + + std::shared_ptr schema = + this->MakeSchema(test_traits::parquet_enum, Repetition::REQUIRED); + FileWriter writer(default_memory_pool(), this->MakeWriter(schema)); for (int i = 0; i < 4; i++) { ASSERT_NO_THROW(ASSERT_OK(writer.NewRowGroup(values_chunk->length()))); ASSERT_NO_THROW(ASSERT_OK(writer.WriteFlatColumnChunk(values_chunk.get()))); @@ -192,18 +273,37 @@ TEST_F(TestParquetIO, SingleColumnInt64ChunkedWrite) { ASSERT_NO_THROW(ASSERT_OK(writer.Close())); std::shared_ptr out; - ReadSingleColumnFile(ReaderFromSink(), &out); + this->ReadSingleColumnFile(this->ReaderFromSink(), &out); ASSERT_TRUE(values->Equals(out)); } -TEST_F(TestParquetIO, SingleColumnDoubleChunkedWrite) { - std::shared_ptr values = NullableArray(100, 128, 10); +TYPED_TEST(TestParquetIO, SingleColumnTableRequiredChunkedWrite) { + std::shared_ptr values = NonNullArray(LARGE_SIZE, 128); + std::shared_ptr
table = MakeSimpleTable(values, false); + this->sink_ = std::make_shared(); + ASSERT_NO_THROW( + ASSERT_OK(WriteFlatTable(table.get(), default_memory_pool(), this->sink_, 512))); + + std::shared_ptr
out; + this->ReadTableFromFile(this->ReaderFromSink(), &out); + ASSERT_EQ(1, out->num_columns()); + ASSERT_EQ(LARGE_SIZE, out->num_rows()); + + std::shared_ptr chunked_array = out->column(0)->data(); + ASSERT_EQ(1, chunked_array->num_chunks()); + ASSERT_TRUE(values->Equals(chunked_array->chunk(0))); +} + +TYPED_TEST(TestParquetIO, SingleColumnOptionalChunkedWrite) { + std::shared_ptr values = NullableArray(SMALL_SIZE, 128, 10); std::shared_ptr values_chunk_nulls = - NullableArray(25, 128, 10); - std::shared_ptr values_chunk = NullableArray(25, 128, 0); + NullableArray(SMALL_SIZE / 4, 128, 10); + std::shared_ptr values_chunk = + NullableArray(SMALL_SIZE / 4, 128, 0); - std::shared_ptr schema = Schema(ParquetType::DOUBLE, Repetition::OPTIONAL); - FileWriter writer(default_memory_pool(), MakeWriter(schema)); + std::shared_ptr schema = + this->MakeSchema(test_traits::parquet_enum, Repetition::OPTIONAL); + FileWriter writer(default_memory_pool(), this->MakeWriter(schema)); ASSERT_NO_THROW(ASSERT_OK(writer.NewRowGroup(values_chunk_nulls->length()))); ASSERT_NO_THROW(ASSERT_OK(writer.WriteFlatColumnChunk(values_chunk_nulls.get()))); for (int i = 0; i < 3; i++) { @@ -213,10 +313,28 @@ TEST_F(TestParquetIO, SingleColumnDoubleChunkedWrite) { ASSERT_NO_THROW(ASSERT_OK(writer.Close())); std::shared_ptr out; - ReadSingleColumnFile(ReaderFromSink(), &out); + this->ReadSingleColumnFile(this->ReaderFromSink(), &out); ASSERT_TRUE(values->Equals(out)); } +TYPED_TEST(TestParquetIO, SingleColumnTableOptionalChunkedWrite) { + // This also tests max_definition_level = 1 + std::shared_ptr values = NullableArray(LARGE_SIZE, 128, 100); + std::shared_ptr
table = MakeSimpleTable(values, true); + this->sink_ = std::make_shared(); + ASSERT_NO_THROW( + ASSERT_OK(WriteFlatTable(table.get(), default_memory_pool(), this->sink_, 512))); + + std::shared_ptr
out; + this->ReadTableFromFile(this->ReaderFromSink(), &out); + ASSERT_EQ(1, out->num_columns()); + ASSERT_EQ(LARGE_SIZE, out->num_rows()); + + std::shared_ptr chunked_array = out->column(0)->data(); + ASSERT_EQ(1, chunked_array->num_chunks()); + ASSERT_TRUE(values->Equals(chunked_array->chunk(0))); +} + } // namespace parquet } // namespace arrow diff --git a/cpp/src/arrow/parquet/reader.cc b/cpp/src/arrow/parquet/reader.cc index 346de253606..3b4882d4439 100644 --- a/cpp/src/arrow/parquet/reader.cc +++ b/cpp/src/arrow/parquet/reader.cc @@ -18,10 +18,14 @@ #include "arrow/parquet/reader.h" #include +#include +#include +#include "arrow/column.h" #include "arrow/parquet/schema.h" #include "arrow/parquet/utils.h" #include "arrow/schema.h" +#include "arrow/table.h" #include "arrow/types/primitive.h" #include "arrow/util/status.h" @@ -40,6 +44,7 @@ class FileReader::Impl { bool CheckForFlatColumn(const ::parquet::ColumnDescriptor* descr); Status GetFlatColumn(int i, std::unique_ptr* out); Status ReadFlatColumn(int i, std::shared_ptr* out); + Status ReadFlatTable(std::shared_ptr
* out); private: MemoryPool* pool_; @@ -103,6 +108,22 @@ Status FileReader::Impl::ReadFlatColumn(int i, std::shared_ptr* out) { return flat_column_reader->NextBatch(reader_->num_rows(), out); } +Status FileReader::Impl::ReadFlatTable(std::shared_ptr
* table) { + const std::string& name = reader_->descr()->schema()->name(); + std::shared_ptr schema; + RETURN_NOT_OK(FromParquetSchema(reader_->descr(), &schema)); + + std::vector> columns(reader_->num_columns()); + for (int i = 0; i < reader_->num_columns(); i++) { + std::shared_ptr array; + RETURN_NOT_OK(ReadFlatColumn(i, &array)); + columns[i] = std::make_shared(schema->field(i), array); + } + + *table = std::make_shared
(name, schema, columns); + return Status::OK(); +} + FileReader::FileReader( MemoryPool* pool, std::unique_ptr<::parquet::ParquetFileReader> reader) : impl_(new FileReader::Impl(pool, std::move(reader))) {} @@ -117,6 +138,10 @@ Status FileReader::ReadFlatColumn(int i, std::shared_ptr* out) { return impl_->ReadFlatColumn(i, out); } +Status FileReader::ReadFlatTable(std::shared_ptr
* out) { + return impl_->ReadFlatTable(out); +} + FlatColumnReader::Impl::Impl(MemoryPool* pool, const ::parquet::ColumnDescriptor* descr, ::parquet::ParquetFileReader* reader, int column_index) : pool_(pool), diff --git a/cpp/src/arrow/parquet/reader.h b/cpp/src/arrow/parquet/reader.h index 41ca7eb35b9..db7a15753d8 100644 --- a/cpp/src/arrow/parquet/reader.h +++ b/cpp/src/arrow/parquet/reader.h @@ -29,6 +29,7 @@ class Array; class MemoryPool; class RowBatch; class Status; +class Table; namespace parquet { @@ -90,6 +91,8 @@ class FileReader { Status GetFlatColumn(int i, std::unique_ptr* out); // Read column as a whole into an Array. Status ReadFlatColumn(int i, std::shared_ptr* out); + // Read a table of flat columns into a Table. + Status ReadFlatTable(std::shared_ptr
* out); virtual ~FileReader(); diff --git a/cpp/src/arrow/parquet/test-util.h b/cpp/src/arrow/parquet/test-util.h new file mode 100644 index 00000000000..1496082d5c6 --- /dev/null +++ b/cpp/src/arrow/parquet/test-util.h @@ -0,0 +1,77 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include +#include + +#include "arrow/types/primitive.h" + +namespace arrow { + +namespace parquet { + +template +std::shared_ptr NonNullArray( + size_t size, typename ArrowType::c_type value) { + std::vector values(size, value); + NumericBuilder builder(default_memory_pool(), std::make_shared()); + builder.Append(values.data(), values.size()); + return std::static_pointer_cast(builder.Finish()); +} + +// This helper function only supports (size/2) nulls yet. +template +std::shared_ptr NullableArray( + size_t size, typename ArrowType::c_type value, size_t num_nulls) { + std::vector values(size, value); + std::vector valid_bytes(size, 1); + + for (size_t i = 0; i < num_nulls; i++) { + valid_bytes[i * 2] = 0; + } + + NumericBuilder builder(default_memory_pool(), std::make_shared()); + builder.Append(values.data(), values.size(), valid_bytes.data()); + return std::static_pointer_cast(builder.Finish()); +} + +std::shared_ptr MakeColumn(const std::string& name, + const std::shared_ptr& array, bool nullable) { + auto field = std::make_shared(name, array->type(), nullable); + return std::make_shared(field, array); +} + +std::shared_ptr
MakeSimpleTable( + const std::shared_ptr& values, bool nullable) { + std::shared_ptr column = MakeColumn("col", values, nullable); + std::vector> columns({column}); + std::vector> fields({column->field()}); + auto schema = std::make_shared(fields); + return std::make_shared
("table", schema, columns); +} + +template +void ExpectArray(T* expected, Array* result) { + PrimitiveArray* p_array = static_cast(result); + for (size_t i = 0; i < result->length(); i++) { + EXPECT_EQ(expected[i], reinterpret_cast(p_array->data()->data())[i]); + } +} + +} // namespace parquet + +} // namespace arrow diff --git a/cpp/src/arrow/parquet/utils.h b/cpp/src/arrow/parquet/utils.h index b32792fdf70..409bcd9065c 100644 --- a/cpp/src/arrow/parquet/utils.h +++ b/cpp/src/arrow/parquet/utils.h @@ -31,6 +31,11 @@ namespace parquet { (s); \ } catch (const ::parquet::ParquetException& e) { return Status::Invalid(e.what()); } +#define PARQUET_IGNORE_NOT_OK(s) \ + try { \ + (s); \ + } catch (const ::parquet::ParquetException& e) {} + } // namespace parquet } // namespace arrow diff --git a/cpp/src/arrow/parquet/writer.cc b/cpp/src/arrow/parquet/writer.cc index 3ad2c5b0735..1223901d550 100644 --- a/cpp/src/arrow/parquet/writer.cc +++ b/cpp/src/arrow/parquet/writer.cc @@ -17,11 +17,21 @@ #include "arrow/parquet/writer.h" +#include +#include + #include "arrow/array.h" +#include "arrow/column.h" +#include "arrow/table.h" +#include "arrow/types/construct.h" #include "arrow/types/primitive.h" +#include "arrow/parquet/schema.h" #include "arrow/parquet/utils.h" #include "arrow/util/status.h" +using parquet::ParquetFileWriter; +using parquet::schema::GroupNode; + namespace arrow { namespace parquet { @@ -32,8 +42,9 @@ class FileWriter::Impl { Status NewRowGroup(int64_t chunk_size); template - Status TypedWriteBatch(::parquet::ColumnWriter* writer, const PrimitiveArray* data); - Status WriteFlatColumnChunk(const PrimitiveArray* data); + Status TypedWriteBatch(::parquet::ColumnWriter* writer, const PrimitiveArray* data, + int64_t offset, int64_t length); + Status WriteFlatColumnChunk(const PrimitiveArray* data, int64_t offset, int64_t length); Status Close(); virtual ~Impl() {} @@ -60,31 +71,31 @@ Status FileWriter::Impl::NewRowGroup(int64_t chunk_size) { } template -Status FileWriter::Impl::TypedWriteBatch( - ::parquet::ColumnWriter* column_writer, const PrimitiveArray* data) { +Status FileWriter::Impl::TypedWriteBatch(::parquet::ColumnWriter* column_writer, + const PrimitiveArray* data, int64_t offset, int64_t length) { + // TODO: DCHECK((offset + length) <= data->length()); auto data_ptr = - reinterpret_cast(data->data()->data()); + reinterpret_cast(data->data()->data()) + + offset; auto writer = reinterpret_cast<::parquet::TypedColumnWriter*>(column_writer); if (writer->descr()->max_definition_level() == 0) { // no nulls, just dump the data - PARQUET_CATCH_NOT_OK(writer->WriteBatch(data->length(), nullptr, nullptr, data_ptr)); + PARQUET_CATCH_NOT_OK(writer->WriteBatch(length, nullptr, nullptr, data_ptr)); } else if (writer->descr()->max_definition_level() == 1) { - RETURN_NOT_OK(def_levels_buffer_.Resize(data->length() * sizeof(int16_t))); + RETURN_NOT_OK(def_levels_buffer_.Resize(length * sizeof(int16_t))); int16_t* def_levels_ptr = reinterpret_cast(def_levels_buffer_.mutable_data()); if (data->null_count() == 0) { - std::fill(def_levels_ptr, def_levels_ptr + data->length(), 1); - PARQUET_CATCH_NOT_OK( - writer->WriteBatch(data->length(), def_levels_ptr, nullptr, data_ptr)); + std::fill(def_levels_ptr, def_levels_ptr + length, 1); + PARQUET_CATCH_NOT_OK(writer->WriteBatch(length, def_levels_ptr, nullptr, data_ptr)); } else { - RETURN_NOT_OK(data_buffer_.Resize( - (data->length() - data->null_count()) * sizeof(typename ParquetType::c_type))); + RETURN_NOT_OK(data_buffer_.Resize(length * sizeof(typename ParquetType::c_type))); auto buffer_ptr = reinterpret_cast(data_buffer_.mutable_data()); int buffer_idx = 0; - for (size_t i = 0; i < data->length(); i++) { - if (data->IsNull(i)) { + for (size_t i = 0; i < length; i++) { + if (data->IsNull(offset + i)) { def_levels_ptr[i] = 0; } else { def_levels_ptr[i] = 1; @@ -92,7 +103,7 @@ Status FileWriter::Impl::TypedWriteBatch( } } PARQUET_CATCH_NOT_OK( - writer->WriteBatch(data->length(), def_levels_ptr, nullptr, buffer_ptr)); + writer->WriteBatch(length, def_levels_ptr, nullptr, buffer_ptr)); } } else { return Status::NotImplemented("no support for max definition level > 1 yet"); @@ -107,12 +118,13 @@ Status FileWriter::Impl::Close() { return Status::OK(); } -#define TYPED_BATCH_CASE(ENUM, ArrowType, ParquetType) \ - case Type::ENUM: \ - return TypedWriteBatch(writer, data); \ +#define TYPED_BATCH_CASE(ENUM, ArrowType, ParquetType) \ + case Type::ENUM: \ + return TypedWriteBatch(writer, data, offset, length); \ break; -Status FileWriter::Impl::WriteFlatColumnChunk(const PrimitiveArray* data) { +Status FileWriter::Impl::WriteFlatColumnChunk( + const PrimitiveArray* data, int64_t offset, int64_t length) { ::parquet::ColumnWriter* writer; PARQUET_CATCH_NOT_OK(writer = row_group_writer_->NextColumn()); switch (data->type_enum()) { @@ -133,8 +145,11 @@ Status FileWriter::NewRowGroup(int64_t chunk_size) { return impl_->NewRowGroup(chunk_size); } -Status FileWriter::WriteFlatColumnChunk(const PrimitiveArray* data) { - return impl_->WriteFlatColumnChunk(data); +Status FileWriter::WriteFlatColumnChunk( + const PrimitiveArray* data, int64_t offset, int64_t length) { + int64_t real_length = length; + if (length == -1) { real_length = data->length(); } + return impl_->WriteFlatColumnChunk(data, offset, real_length); } Status FileWriter::Close() { @@ -143,6 +158,48 @@ Status FileWriter::Close() { FileWriter::~FileWriter() {} +Status WriteFlatTable(const Table* table, MemoryPool* pool, + std::shared_ptr<::parquet::OutputStream> sink, int64_t chunk_size) { + std::shared_ptr<::parquet::SchemaDescriptor> parquet_schema; + RETURN_NOT_OK(ToParquetSchema(table->schema().get(), &parquet_schema)); + auto schema_node = std::static_pointer_cast(parquet_schema->schema()); + std::unique_ptr parquet_writer = + ParquetFileWriter::Open(sink, schema_node); + FileWriter writer(pool, std::move(parquet_writer)); + + // TODO: Support writing chunked arrays. + for (int i = 0; i < table->num_columns(); i++) { + if (table->column(i)->data()->num_chunks() != 1) { + return Status::NotImplemented("No support for writing chunked arrays yet."); + } + } + + // Cast to PrimitiveArray instances as we work with them. + std::vector> arrays(table->num_columns()); + for (int i = 0; i < table->num_columns(); i++) { + // num_chunks == 1 as per above loop + std::shared_ptr array = table->column(i)->data()->chunk(0); + auto primitive_array = std::dynamic_pointer_cast(array); + if (!primitive_array) { + PARQUET_IGNORE_NOT_OK(writer.Close()); + return Status::NotImplemented("Table must consist of PrimitiveArray instances"); + } + arrays[i] = primitive_array; + } + + for (int chunk = 0; chunk * chunk_size < table->num_rows(); chunk++) { + int64_t offset = chunk * chunk_size; + int64_t size = std::min(chunk_size, table->num_rows() - offset); + RETURN_NOT_OK_ELSE(writer.NewRowGroup(size), PARQUET_IGNORE_NOT_OK(writer.Close())); + for (int i = 0; i < table->num_columns(); i++) { + RETURN_NOT_OK_ELSE(writer.WriteFlatColumnChunk(arrays[i].get(), offset, size), + PARQUET_IGNORE_NOT_OK(writer.Close())); + } + } + + return writer.Close(); +} + } // namespace parquet } // namespace arrow diff --git a/cpp/src/arrow/parquet/writer.h b/cpp/src/arrow/parquet/writer.h index 38f7d0b3a89..83e799f7ed1 100644 --- a/cpp/src/arrow/parquet/writer.h +++ b/cpp/src/arrow/parquet/writer.h @@ -29,6 +29,7 @@ class MemoryPool; class PrimitiveArray; class RowBatch; class Status; +class Table; namespace parquet { @@ -42,7 +43,8 @@ class FileWriter { FileWriter(MemoryPool* pool, std::unique_ptr<::parquet::ParquetFileWriter> writer); Status NewRowGroup(int64_t chunk_size); - Status WriteFlatColumnChunk(const PrimitiveArray* data); + Status WriteFlatColumnChunk( + const PrimitiveArray* data, int64_t offset = 0, int64_t length = -1); Status Close(); virtual ~FileWriter(); @@ -52,6 +54,14 @@ class FileWriter { std::unique_ptr impl_; }; +/** + * Write a flat Table to Parquet. + * + * The table shall only consist of nullable, non-repeated columns of primitive type. + */ +Status WriteFlatTable(const Table* table, MemoryPool* pool, + std::shared_ptr<::parquet::OutputStream> sink, int64_t chunk_size); + } // namespace parquet } // namespace arrow diff --git a/cpp/src/arrow/util/status.h b/cpp/src/arrow/util/status.h index 6ddc177a9a5..d1a74250008 100644 --- a/cpp/src/arrow/util/status.h +++ b/cpp/src/arrow/util/status.h @@ -63,6 +63,15 @@ namespace arrow { if (!_s.ok()) { return _s; } \ } while (0); +#define RETURN_NOT_OK_ELSE(s, else_) \ + do { \ + Status _s = (s); \ + if (!_s.ok()) { \ + else_; \ + return _s; \ + } \ + } while (0); + enum class StatusCode : char { OK = 0, OutOfMemory = 1, diff --git a/python/CMakeLists.txt b/python/CMakeLists.txt index 2173232d4ef..f1becfcf449 100644 --- a/python/CMakeLists.txt +++ b/python/CMakeLists.txt @@ -339,11 +339,17 @@ if (PYARROW_BUILD_TESTS) STATIC_LIB ${GTEST_STATIC_LIB}) endif() +## Parquet +find_package(Parquet REQUIRED) +include_directories(SYSTEM ${PARQUET_INCLUDE_DIR}) + ## Arrow find_package(Arrow REQUIRED) include_directories(SYSTEM ${ARROW_INCLUDE_DIR}) ADD_THIRDPARTY_LIB(arrow SHARED_LIB ${ARROW_SHARED_LIB}) +ADD_THIRDPARTY_LIB(arrow_parquet + SHARED_LIB ${ARROW_PARQUET_SHARED_LIB}) ############################################################ # Linker setup @@ -422,6 +428,7 @@ set(PYARROW_SRCS set(LINK_LIBS arrow + arrow_parquet ) SET(CMAKE_INSTALL_RPATH_USE_LINK_PATH TRUE) @@ -442,6 +449,7 @@ set(CYTHON_EXTENSIONS array config error + parquet scalar schema table diff --git a/python/cmake_modules/FindArrow.cmake b/python/cmake_modules/FindArrow.cmake index 3d9983849eb..f0b258ed027 100644 --- a/python/cmake_modules/FindArrow.cmake +++ b/python/cmake_modules/FindArrow.cmake @@ -42,19 +42,27 @@ find_library(ARROW_LIB_PATH NAMES arrow ${ARROW_SEARCH_LIB_PATH} NO_DEFAULT_PATH) -if (ARROW_INCLUDE_DIR AND ARROW_LIB_PATH) +find_library(ARROW_PARQUET_LIB_PATH NAMES arrow_parquet + PATHS + ${ARROW_SEARCH_LIB_PATH} + NO_DEFAULT_PATH) + +if (ARROW_INCLUDE_DIR AND ARROW_LIB_PATH AND ARROW_PARQUET_LIB_PATH) set(ARROW_FOUND TRUE) set(ARROW_LIB_NAME libarrow) + set(ARROW_PARQUET_LIB_NAME libarrow_parquet) set(ARROW_LIBS ${ARROW_SEARCH_LIB_PATH}) set(ARROW_STATIC_LIB ${ARROW_SEARCH_LIB_PATH}/${ARROW_LIB_NAME}.a) set(ARROW_SHARED_LIB ${ARROW_LIBS}/${ARROW_LIB_NAME}${CMAKE_SHARED_LIBRARY_SUFFIX}) + set(ARROW_PARQUET_STATIC_LIB ${ARROW_SEARCH_LIB_PATH}/${ARROW_PARQUET_LIB_NAME}.a) + set(ARROW_PARQUET_SHARED_LIB ${ARROW_LIBS}/${ARROW_PARQUET_LIB_NAME}${CMAKE_SHARED_LIBRARY_SUFFIX}) else () set(ARROW_FOUND FALSE) endif () if (ARROW_FOUND) if (NOT Arrow_FIND_QUIETLY) - message(STATUS "Found the Arrow library: ${ARROW_LIB_PATH}") + message(STATUS "Found the Arrow library: ${ARROW_LIB_PATH}, ${ARROW_PARQUET_LIB_PATH}") endif () else () if (NOT Arrow_FIND_QUIETLY) @@ -74,4 +82,6 @@ mark_as_advanced( ARROW_LIBS ARROW_STATIC_LIB ARROW_SHARED_LIB + ARROW_PARQUET_STATIC_LIB + ARROW_PARQUET_SHARED_LIB ) diff --git a/python/conda.recipe/build.sh b/python/conda.recipe/build.sh index a9d9aedead3..a164c1af518 100644 --- a/python/conda.recipe/build.sh +++ b/python/conda.recipe/build.sh @@ -6,6 +6,19 @@ export ARROW_HOME=$PREFIX cd $RECIPE_DIR +if [ "$(uname)" == "Darwin" ]; then + # C++11 finagling for Mac OSX + export CC=clang + export CXX=clang++ + export MACOSX_VERSION_MIN="10.7" + CXXFLAGS="${CXXFLAGS} -mmacosx-version-min=${MACOSX_VERSION_MIN}" + CXXFLAGS="${CXXFLAGS} -stdlib=libc++ -std=c++11" + export LDFLAGS="${LDFLAGS} -mmacosx-version-min=${MACOSX_VERSION_MIN}" + export LDFLAGS="${LDFLAGS} -stdlib=libc++ -std=c++11" + export LINKFLAGS="${LDFLAGS}" + export MACOSX_DEPLOYMENT_TARGET=10.7 +fi + echo Setting the compiler... if [ `uname` == Linux ]; then EXTRA_CMAKE_ARGS=-DCMAKE_SHARED_LINKER_FLAGS=-static-libstdc++ diff --git a/python/pyarrow/array.pyx b/python/pyarrow/array.pyx index a80b3ce8398..619e5ef7e39 100644 --- a/python/pyarrow/array.pyx +++ b/python/pyarrow/array.pyx @@ -68,6 +68,9 @@ cdef class Array: values = array_format(self, window=10) return '{0}\n{1}'.format(type_format, values) + def equals(Array self, Array other): + return self.ap.Equals(other.sp_array) + def __len__(self): if self.sp_array.get(): return self.sp_array.get().length() diff --git a/python/pyarrow/error.pxd b/python/pyarrow/error.pxd index d226abeda04..97ba0ef2e9f 100644 --- a/python/pyarrow/error.pxd +++ b/python/pyarrow/error.pxd @@ -15,6 +15,8 @@ # specific language governing permissions and limitations # under the License. +from pyarrow.includes.libarrow cimport CStatus from pyarrow.includes.pyarrow cimport * +cdef check_cstatus(const CStatus& status) cdef check_status(const Status& status) diff --git a/python/pyarrow/error.pyx b/python/pyarrow/error.pyx index 3f8d7dd6460..5a6a038a92e 100644 --- a/python/pyarrow/error.pyx +++ b/python/pyarrow/error.pyx @@ -15,12 +15,20 @@ # specific language governing permissions and limitations # under the License. +from pyarrow.includes.libarrow cimport CStatus from pyarrow.includes.common cimport c_string from pyarrow.compat import frombytes class ArrowException(Exception): pass +cdef check_cstatus(const CStatus& status): + if status.ok(): + return + + cdef c_string c_message = status.ToString() + raise ArrowException(frombytes(c_message)) + cdef check_status(const Status& status): if status.ok(): return diff --git a/python/pyarrow/includes/common.pxd b/python/pyarrow/includes/common.pxd index e86d5d77e8b..1f6ecee5105 100644 --- a/python/pyarrow/includes/common.pxd +++ b/python/pyarrow/includes/common.pxd @@ -19,6 +19,7 @@ from libc.stdint cimport * from libcpp cimport bool as c_bool +from libcpp.memory cimport shared_ptr, unique_ptr from libcpp.string cimport string as c_string from libcpp.vector cimport vector @@ -32,11 +33,3 @@ cdef extern from "": cdef extern from "": void Py_XDECREF(PyObject* o) -cdef extern from "" namespace "std" nogil: - - cdef cppclass shared_ptr[T]: - shared_ptr() - shared_ptr(T*) - T* get() - void reset() - void reset(T* p) diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd index b2ef45a347b..90414e3d542 100644 --- a/python/pyarrow/includes/libarrow.pxd +++ b/python/pyarrow/includes/libarrow.pxd @@ -72,6 +72,8 @@ cdef extern from "arrow/api.h" namespace "arrow" nogil: cdef cppclass MemoryPool" arrow::MemoryPool": int64_t bytes_allocated() + cdef MemoryPool* default_memory_pool() + cdef cppclass CListType" arrow::ListType"(CDataType): CListType(const shared_ptr[CDataType]& value_type) @@ -103,6 +105,7 @@ cdef extern from "arrow/api.h" namespace "arrow" nogil: int32_t null_count() Type type_enum() + c_bool Equals(const shared_ptr[CArray]& arr) c_bool IsNull(int i) cdef cppclass CBooleanArray" arrow::BooleanArray"(CArray): diff --git a/python/pyarrow/includes/parquet.pxd b/python/pyarrow/includes/parquet.pxd index ffdc5d48706..0918344070e 100644 --- a/python/pyarrow/includes/parquet.pxd +++ b/python/pyarrow/includes/parquet.pxd @@ -18,6 +18,26 @@ # distutils: language = c++ from pyarrow.includes.common cimport * +from pyarrow.includes.libarrow cimport CSchema, CStatus, CTable, MemoryPool + + +cdef extern from "parquet/api/schema.h" namespace "parquet::schema" nogil: + cdef cppclass Node: + pass + + cdef cppclass GroupNode(Node): + pass + + cdef cppclass PrimitiveNode(Node): + pass + +cdef extern from "parquet/api/schema.h" namespace "parquet" nogil: + cdef cppclass SchemaDescriptor: + shared_ptr[Node] schema() + GroupNode* group() + + cdef cppclass ColumnDescriptor: + pass cdef extern from "parquet/api/reader.h" namespace "parquet" nogil: cdef cppclass ColumnReader: @@ -48,4 +68,30 @@ cdef extern from "parquet/api/reader.h" namespace "parquet" nogil: pass cdef cppclass ParquetFileReader: + # TODO: Some default arguments are missing + @staticmethod + unique_ptr[ParquetFileReader] OpenFile(const c_string& path) + +cdef extern from "parquet/api/writer.h" namespace "parquet" nogil: + cdef cppclass OutputStream: pass + + cdef cppclass LocalFileOutputStream(OutputStream): + LocalFileOutputStream(const c_string& path) + void Close() + + +cdef extern from "arrow/parquet/reader.h" namespace "arrow::parquet" nogil: + cdef cppclass FileReader: + FileReader(MemoryPool* pool, unique_ptr[ParquetFileReader] reader) + CStatus ReadFlatTable(shared_ptr[CTable]* out); + + +cdef extern from "arrow/parquet/schema.h" namespace "arrow::parquet" nogil: + CStatus FromParquetSchema(const SchemaDescriptor* parquet_schema, shared_ptr[CSchema]* out) + CStatus ToParquetSchema(const CSchema* arrow_schema, shared_ptr[SchemaDescriptor]* out) + + +cdef extern from "arrow/parquet/writer.h" namespace "arrow::parquet" nogil: + cdef CStatus WriteFlatTable(const CTable* table, MemoryPool* pool, shared_ptr[OutputStream] sink, int64_t chunk_size) + diff --git a/python/pyarrow/parquet.pyx b/python/pyarrow/parquet.pyx index 622e7d07724..3d5355ebe43 100644 --- a/python/pyarrow/parquet.pyx +++ b/python/pyarrow/parquet.pyx @@ -19,5 +19,53 @@ # distutils: language = c++ # cython: embedsignature = True -from pyarrow.compat import frombytes, tobytes +from pyarrow.includes.libarrow cimport * +cimport pyarrow.includes.pyarrow as pyarrow from pyarrow.includes.parquet cimport * + +from pyarrow.compat import tobytes +from pyarrow.error cimport check_cstatus +from pyarrow.table cimport Table + +def read_table(filename, columns=None): + """ + Read a Table from Parquet format + Returns + ------- + table: pyarrow.Table + """ + cdef unique_ptr[FileReader] reader + cdef Table table = Table() + cdef shared_ptr[CTable] ctable + + # Must be in one expression to avoid calling std::move which is not possible + # in Cython (due to missing rvalue support) + reader = unique_ptr[FileReader](new FileReader(default_memory_pool(), + ParquetFileReader.OpenFile(tobytes(filename)))) + check_cstatus(reader.get().ReadFlatTable(&ctable)) + table.init(ctable) + return table + +def write_table(table, filename, chunk_size=None): + """ + Write a Table to Parquet format + + Parameters + ---------- + table : pyarrow.Table + filename : string + chunk_size : int + The maximum number of rows in each Parquet RowGroup + """ + cdef Table table_ = table + cdef CTable* ctable_ = table_.table + cdef shared_ptr[OutputStream] sink + cdef int64_t chunk_size_ = 0 + if chunk_size is None: + chunk_size_ = min(ctable_.num_rows(), int(2**16)) + else: + chunk_size_ = chunk_size + + sink.reset(new LocalFileOutputStream(tobytes(filename))) + check_cstatus(WriteFlatTable(ctable_, default_memory_pool(), sink, chunk_size_)) + diff --git a/python/pyarrow/schema.pyx b/python/pyarrow/schema.pyx index 22ddf0cf17e..084c304aed2 100644 --- a/python/pyarrow/schema.pyx +++ b/python/pyarrow/schema.pyx @@ -201,7 +201,9 @@ def string(): def list_(DataType value_type): cdef DataType out = DataType() - out.init(shared_ptr[CDataType](new CListType(value_type.sp_type))) + cdef shared_ptr[CDataType] list_type + list_type.reset(new CListType(value_type.sp_type)) + out.init(list_type) return out def struct(fields): @@ -212,12 +214,13 @@ def struct(fields): DataType out = DataType() Field field vector[shared_ptr[CField]] c_fields + cdef shared_ptr[CDataType] struct_type for field in fields: c_fields.push_back(field.sp_field) - out.init(shared_ptr[CDataType]( - new CStructType(c_fields))) + struct_type.reset(new CStructType(c_fields)) + out.init(struct_type) return out def schema(fields): diff --git a/python/pyarrow/tests/test_parquet.py b/python/pyarrow/tests/test_parquet.py new file mode 100644 index 00000000000..d92cf4ca656 --- /dev/null +++ b/python/pyarrow/tests/test_parquet.py @@ -0,0 +1,59 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from pyarrow.compat import unittest +import pyarrow as arrow +import pyarrow.parquet + +A = arrow + +import numpy as np +import os.path +import pandas as pd + +import pandas.util.testing as pdt + + +def test_single_pylist_column_roundtrip(tmpdir): + for dtype in [int, float]: + filename = tmpdir.join('single_{}_column.parquet'.format(dtype.__name__)) + data = [A.from_pylist(list(map(dtype, range(5))))] + table = A.Table.from_arrays(('a', 'b'), data, 'table_name') + A.parquet.write_table(table, filename.strpath) + table_read = pyarrow.parquet.read_table(filename.strpath) + for col_written, col_read in zip(table.itercolumns(), table_read.itercolumns()): + assert col_written.name == col_read.name + assert col_read.data.num_chunks == 1 + data_written = col_written.data.chunk(0) + data_read = col_read.data.chunk(0) + assert data_written.equals(data_read) + +def test_pandas_rountrip(tmpdir): + size = 10000 + df = pd.DataFrame({ + 'int32': np.arange(size, dtype=np.int32), + 'int64': np.arange(size, dtype=np.int64), + 'float32': np.arange(size, dtype=np.float32), + 'float64': np.arange(size, dtype=np.float64) + }) + filename = tmpdir.join('pandas_rountrip.parquet') + arrow_table = A.from_pandas_dataframe(df) + A.parquet.write_table(arrow_table, filename.strpath) + table_read = pyarrow.parquet.read_table(filename.strpath) + df_read = table_read.to_pandas() + pdt.assert_frame_equal(df, df_read) + diff --git a/python/setup.py b/python/setup.py index 5f228ed0af2..7edeb914331 100644 --- a/python/setup.py +++ b/python/setup.py @@ -214,7 +214,7 @@ def get_ext_built(self, name): return name + suffix def get_cmake_cython_names(self): - return ['array', 'config', 'error', 'scalar', 'schema', 'table'] + return ['array', 'config', 'error', 'parquet', 'scalar', 'schema', 'table'] def get_names(self): return self._found_names @@ -242,7 +242,7 @@ def get_outputs(self): 'clean': clean, 'build_ext': build_ext }, - install_requires=['cython >= 0.21', 'numpy >= 1.9'], + install_requires=['cython >= 0.23', 'numpy >= 1.9'], description=DESC, license='Apache License, Version 2.0', maintainer="Apache Arrow Developers",