From 5d05c2ef35d878f5a2a52a812137483e431144b3 Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Sat, 20 Feb 2016 16:11:41 -0800 Subject: [PATCH] PARQUET-457: Verify page deserialization for GZIP and SNAPPY codecs, related refactoring This also restores passing on user's `CMAKE_CXX_FLAGS`, which had unfortunately led some compiler warnings to creep into our build. Author: Wes McKinney Closes #58 from wesm/PARQUET-457 and squashes the following commits: 4bf12ed [Wes McKinney] * SerializeThriftMsg now writes into an OutputStream. * Refactor page serialization in advance of compression tests * Test compression roundtrip on random bytes for snappy and gzip * Trying LZO compression results in ParquetException * Don't lose user's CMAKE_CXX_FLAGS * Remove Travis CI directory caching for now * Fix gzip memory leak if you do not call inflateEnd, deflateEnd Change-Id: I44a58ef2d22f8e5064d198d0abeecde7ba4de3cb --- cpp/src/parquet/column/levels-test.cc | 1 + cpp/src/parquet/column/page.h | 16 +- cpp/src/parquet/column/test-util.h | 29 --- cpp/src/parquet/compression/CMakeLists.txt | 1 + cpp/src/parquet/compression/codec.cc | 47 ++++ cpp/src/parquet/compression/codec.h | 8 + cpp/src/parquet/compression/gzip-codec.cc | 31 ++- .../parquet/encodings/plain-encoding-test.cc | 8 +- cpp/src/parquet/file/file-deserialize-test.cc | 232 ++++++++++++++---- cpp/src/parquet/file/reader-internal.cc | 30 +-- .../parquet/schema/schema-descriptor-test.cc | 1 + cpp/src/parquet/thrift/CMakeLists.txt | 2 - cpp/src/parquet/thrift/serializer-test.cc | 75 ------ cpp/src/parquet/thrift/util.h | 11 +- cpp/src/parquet/util/macros.h | 5 + cpp/src/parquet/util/output.h | 4 + 16 files changed, 317 insertions(+), 184 deletions(-) create mode 100644 cpp/src/parquet/compression/codec.cc delete mode 100644 cpp/src/parquet/thrift/serializer-test.cc diff --git a/cpp/src/parquet/column/levels-test.cc b/cpp/src/parquet/column/levels-test.cc index 62188db822bef..0e3c20f222a8b 100644 --- a/cpp/src/parquet/column/levels-test.cc +++ b/cpp/src/parquet/column/levels-test.cc @@ -16,6 +16,7 @@ // under the License. #include +#include #include #include diff --git a/cpp/src/parquet/column/page.h b/cpp/src/parquet/column/page.h index 3308a1c7349d8..916fd121c884f 100644 --- a/cpp/src/parquet/column/page.h +++ b/cpp/src/parquet/column/page.h @@ -24,6 +24,7 @@ #include #include +#include #include "parquet/types.h" @@ -93,13 +94,26 @@ class DataPage : public Page { return definition_level_encoding_; } + // DataPageHeader::statistics::max field, if it was set + const uint8_t* max() const { + return reinterpret_cast(max_.c_str()); + } + + // DataPageHeader::statistics::min field, if it was set + const uint8_t* min() const { + return reinterpret_cast(min_.c_str()); + } + private: int32_t num_values_; Encoding::type encoding_; Encoding::type definition_level_encoding_; Encoding::type repetition_level_encoding_; - // TODO(wesm): parquet::DataPageHeader.statistics + // So max/min can be populated privately + friend class SerializedPageReader; + std::string max_; + std::string min_; }; diff --git a/cpp/src/parquet/column/test-util.h b/cpp/src/parquet/column/test-util.h index b346fc2b37444..b12f340ec3d7b 100644 --- a/cpp/src/parquet/column/test-util.h +++ b/cpp/src/parquet/column/test-util.h @@ -32,7 +32,6 @@ // Depended on by SerializedPageReader test utilities for now #include "parquet/encodings/plain-encoding.h" -#include "parquet/thrift/util.h" #include "parquet/util/input.h" namespace parquet_cpp { @@ -195,34 +194,6 @@ static std::shared_ptr MakeDataPage(const std::vector& values, } // namespace test -// Utilities for testing the SerializedPageReader internally - -static inline void InitDataPage(const parquet::Statistics& stat, - parquet::DataPageHeader& data_page, int32_t nvalues) { - data_page.encoding = parquet::Encoding::PLAIN; - data_page.definition_level_encoding = parquet::Encoding::RLE; - data_page.repetition_level_encoding = parquet::Encoding::RLE; - data_page.num_values = nvalues; - data_page.__set_statistics(stat); -} - -static inline void InitStats(size_t stat_size, parquet::Statistics& stat) { - std::vector stat_buffer; - stat_buffer.resize(stat_size); - for (int i = 0; i < stat_size; i++) { - (reinterpret_cast(stat_buffer.data()))[i] = i % 255; - } - stat.__set_max(std::string(stat_buffer.data(), stat_size)); -} - -static inline void InitPageHeader(const parquet::DataPageHeader &data_page, - parquet::PageHeader& page_header) { - page_header.__set_data_page_header(data_page); - page_header.uncompressed_page_size = 0; - page_header.compressed_page_size = 0; - page_header.type = parquet::PageType::DATA_PAGE; -} - } // namespace parquet_cpp #endif // PARQUET_COLUMN_TEST_UTIL_H diff --git a/cpp/src/parquet/compression/CMakeLists.txt b/cpp/src/parquet/compression/CMakeLists.txt index 2c0b67c3e4839..f0ee110a64e96 100644 --- a/cpp/src/parquet/compression/CMakeLists.txt +++ b/cpp/src/parquet/compression/CMakeLists.txt @@ -16,6 +16,7 @@ # under the License. add_library(parquet_compression STATIC + codec.cc lz4-codec.cc snappy-codec.cc gzip-codec.cc diff --git a/cpp/src/parquet/compression/codec.cc b/cpp/src/parquet/compression/codec.cc new file mode 100644 index 0000000000000..60d308eb375ab --- /dev/null +++ b/cpp/src/parquet/compression/codec.cc @@ -0,0 +1,47 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include + +#include "parquet/compression/codec.h" +#include "parquet/exception.h" +#include "parquet/types.h" + +namespace parquet_cpp { + +std::unique_ptr Codec::Create(Compression::type codec_type) { + std::unique_ptr result; + switch (codec_type) { + case Compression::UNCOMPRESSED: + break; + case Compression::SNAPPY: + result.reset(new SnappyCodec()); + break; + case Compression::GZIP: + result.reset(new GZipCodec()); + break; + case Compression::LZO: + ParquetException::NYI("LZO codec not implemented"); + break; + default: + ParquetException::NYI("Unrecognized codec"); + break; + } + return result; +} + +} // namespace parquet_cpp diff --git a/cpp/src/parquet/compression/codec.h b/cpp/src/parquet/compression/codec.h index 8fc4ada13a74c..bc73f02f3e074 100644 --- a/cpp/src/parquet/compression/codec.h +++ b/cpp/src/parquet/compression/codec.h @@ -19,16 +19,21 @@ #define PARQUET_COMPRESSION_CODEC_H #include +#include #include #include "parquet/exception.h" +#include "parquet/types.h" namespace parquet_cpp { class Codec { public: virtual ~Codec() {} + + static std::unique_ptr Create(Compression::type codec); + virtual void Decompress(int64_t input_len, const uint8_t* input, int64_t output_len, uint8_t* output_buffer) = 0; @@ -80,6 +85,7 @@ class GZipCodec : public Codec { }; explicit GZipCodec(Format format = GZIP); + virtual ~GZipCodec(); virtual void Decompress(int64_t input_len, const uint8_t* input, int64_t output_len, uint8_t* output_buffer); @@ -109,6 +115,8 @@ class GZipCodec : public Codec { // perform the refactoring then void InitCompressor(); void InitDecompressor(); + void EndCompressor(); + void EndDecompressor(); bool compressor_initialized_; bool decompressor_initialized_; }; diff --git a/cpp/src/parquet/compression/gzip-codec.cc b/cpp/src/parquet/compression/gzip-codec.cc index 6ec2726030f88..f48fdad766b49 100644 --- a/cpp/src/parquet/compression/gzip-codec.cc +++ b/cpp/src/parquet/compression/gzip-codec.cc @@ -15,10 +15,12 @@ // specific language governing permissions and limitations // under the License. -#include "parquet/compression/codec.h" - #include #include +#include + +#include "parquet/compression/codec.h" +#include "parquet/exception.h" namespace parquet_cpp { @@ -40,7 +42,13 @@ GZipCodec::GZipCodec(Format format) : decompressor_initialized_(false) { } +GZipCodec::~GZipCodec() { + EndCompressor(); + EndDecompressor(); +} + void GZipCodec::InitCompressor() { + EndDecompressor(); memset(&stream_, 0, sizeof(stream_)); int ret; @@ -58,12 +66,18 @@ void GZipCodec::InitCompressor() { } compressor_initialized_ = true; - decompressor_initialized_ = false; +} + +void GZipCodec::EndCompressor() { + if (compressor_initialized_) { + (void)deflateEnd(&stream_); + } + compressor_initialized_ = false; } void GZipCodec::InitDecompressor() { + EndCompressor(); memset(&stream_, 0, sizeof(stream_)); - int ret; // Initialize to run either deflate or zlib/gzip format @@ -71,11 +85,16 @@ void GZipCodec::InitDecompressor() { if ((ret = inflateInit2(&stream_, window_bits)) != Z_OK) { throw ParquetException("zlib inflateInit failed: " + std::string(stream_.msg)); } - - compressor_initialized_ = false; decompressor_initialized_ = true; } +void GZipCodec::EndDecompressor() { + if (decompressor_initialized_) { + (void)inflateEnd(&stream_); + } + decompressor_initialized_ = false; +} + void GZipCodec::Decompress(int64_t input_length, const uint8_t* input, int64_t output_length, uint8_t* output) { if (!decompressor_initialized_) { diff --git a/cpp/src/parquet/encodings/plain-encoding-test.cc b/cpp/src/parquet/encodings/plain-encoding-test.cc index b8ef13b9d01da..5091dc83b0dc9 100644 --- a/cpp/src/parquet/encodings/plain-encoding-test.cc +++ b/cpp/src/parquet/encodings/plain-encoding-test.cc @@ -17,11 +17,13 @@ #include #include +#include #include #include #include +#include "parquet/schema/descriptor.h" #include "parquet/encodings/plain-encoding.h" #include "parquet/types.h" #include "parquet/schema/types.h" @@ -80,7 +82,7 @@ class EncodeDecode{ void generate_data() { // seed the prng so failure is deterministic - random_numbers(num_values_, 0.5, draws_); + random_numbers(num_values_, 0, draws_); } void encode_decode(ColumnDescriptor *d) { @@ -141,7 +143,7 @@ void EncodeDecode::generate_data() { int max_byte_array_len = 12 + sizeof(uint32_t); size_t nbytes = num_values_ * max_byte_array_len; data_buffer_.resize(nbytes); - random_byte_array(num_values_, 0.5, data_buffer_.data(), draws_, + random_byte_array(num_values_, 0, data_buffer_.data(), draws_, max_byte_array_len); } @@ -160,7 +162,7 @@ void EncodeDecode::generate_data() { size_t nbytes = num_values_ * flba_length; data_buffer_.resize(nbytes); ASSERT_EQ(nbytes, data_buffer_.size()); - random_fixed_byte_array(num_values_, 0.5, data_buffer_.data(), flba_length, draws_); + random_fixed_byte_array(num_values_, 0, data_buffer_.data(), flba_length, draws_); } template<> diff --git a/cpp/src/parquet/file/file-deserialize-test.cc b/cpp/src/parquet/file/file-deserialize-test.cc index e90889dd7fdff..cfb3e8686fa93 100644 --- a/cpp/src/parquet/file/file-deserialize-test.cc +++ b/cpp/src/parquet/file/file-deserialize-test.cc @@ -20,92 +20,224 @@ #include #include #include +#include #include #include #include +#include #include "parquet/column/page.h" -#include "parquet/column/test-util.h" - +#include "parquet/compression/codec.h" +#include "parquet/exception.h" #include "parquet/file/reader-internal.h" #include "parquet/thrift/parquet_types.h" #include "parquet/thrift/util.h" #include "parquet/types.h" #include "parquet/util/input.h" +#include "parquet/util/output.h" +#include "parquet/util/test-common.h" namespace parquet_cpp { -class TestSerializedPage : public ::testing::Test { + +// Adds page statistics occupying a certain amount of bytes (for testing very +// large page headers) +static inline void AddDummyStats(size_t stat_size, + parquet::DataPageHeader& data_page) { + + std::vector stat_bytes(stat_size); + // Some non-zero value + std::fill(stat_bytes.begin(), stat_bytes.end(), 1); + data_page.statistics.__set_max(std::string( + reinterpret_cast(stat_bytes.data()), stat_size)); + data_page.__isset.statistics = true; +} + +class TestPageSerde : public ::testing::Test { public: - void InitSerializedPageReader(const uint8_t* buffer, size_t header_size, - Compression::type codec) { + void SetUp() { + data_page_header_.encoding = parquet::Encoding::PLAIN; + data_page_header_.definition_level_encoding = parquet::Encoding::RLE; + data_page_header_.repetition_level_encoding = parquet::Encoding::RLE; + + ResetStream(); + } + + void InitSerializedPageReader(Compression::type codec = + Compression::UNCOMPRESSED) { + EndStream(); std::unique_ptr stream; - stream.reset(new InMemoryInputStream(buffer, header_size)); + stream.reset(new InMemoryInputStream(out_buffer_.data(), + out_buffer_.size())); page_reader_.reset(new SerializedPageReader(std::move(stream), codec)); } + void WriteDataPageHeader(int max_serialized_len = 1024, + int32_t uncompressed_size = 0, int32_t compressed_size = 0) { + // Simplifying writing serialized data page headers which may or may not + // have meaningful data associated with them + + // Serialize the Page header + uint32_t serialized_len = max_serialized_len; + page_header_.__set_data_page_header(data_page_header_); + page_header_.uncompressed_page_size = uncompressed_size; + page_header_.compressed_page_size = compressed_size; + page_header_.type = parquet::PageType::DATA_PAGE; + + ASSERT_NO_THROW(SerializeThriftMsg(&page_header_, max_serialized_len, + out_stream_.get())); + } + + void ResetStream() { + out_buffer_.resize(0); + out_stream_.reset(new InMemoryOutputStream()); + } + + void EndStream() { + out_stream_->Transfer(&out_buffer_); + } + protected: + std::unique_ptr out_stream_; + + // TODO(wesm): Owns the results of the output stream. To be refactored + std::vector out_buffer_; + std::unique_ptr page_reader_; + parquet::PageHeader page_header_; + parquet::DataPageHeader data_page_header_; }; -TEST_F(TestSerializedPage, TestLargePageHeaders) { - parquet::PageHeader in_page_header; - parquet::DataPageHeader data_page_header; +void CheckDataPageHeader(const parquet::DataPageHeader expected, + const Page* page) { + ASSERT_EQ(PageType::DATA_PAGE, page->type()); + + const DataPage* data_page = static_cast(page); + ASSERT_EQ(expected.num_values, data_page->num_values()); + ASSERT_EQ(expected.encoding, data_page->encoding()); + ASSERT_EQ(expected.definition_level_encoding, + data_page->definition_level_encoding()); + ASSERT_EQ(expected.repetition_level_encoding, + data_page->repetition_level_encoding()); + + if (expected.statistics.__isset.max) { + ASSERT_EQ(0, memcmp(expected.statistics.max.c_str(), + data_page->max(), expected.statistics.max.length())); + } + if (expected.statistics.__isset.min) { + ASSERT_EQ(0, memcmp(expected.statistics.min.c_str(), + data_page->min(), expected.statistics.min.length())); + } +} + +TEST_F(TestPageSerde, DataPage) { parquet::PageHeader out_page_header; - parquet::Statistics stats; - int expected_header_size = 512 * 1024; //512 KB + + int stats_size = 512; + AddDummyStats(stats_size, data_page_header_); + data_page_header_.num_values = 4444; + + WriteDataPageHeader(); + InitSerializedPageReader(); + std::shared_ptr current_page = page_reader_->NextPage(); + CheckDataPageHeader(data_page_header_, current_page.get()); +} + +TEST_F(TestPageSerde, TestLargePageHeaders) { int stats_size = 256 * 1024; // 256 KB - std::string serialized_buffer; - int num_values = 4141; + AddDummyStats(stats_size, data_page_header_); - InitStats(stats_size, stats); - InitDataPage(stats, data_page_header, num_values); - InitPageHeader(data_page_header, in_page_header); + // Any number to verify metadata roundtrip + data_page_header_.num_values = 4141; - // Serialize the Page header - ASSERT_NO_THROW(serialized_buffer = SerializeThriftMsg(&in_page_header, - expected_header_size)); - // check header size is between 256 KB to 16 MB - ASSERT_LE(stats_size, serialized_buffer.length()); - ASSERT_GE(DEFAULT_MAX_PAGE_HEADER_SIZE, serialized_buffer.length()); + int max_header_size = 512 * 1024; // 512 KB + WriteDataPageHeader(max_header_size); + ASSERT_GE(max_header_size, out_stream_->Tell()); - InitSerializedPageReader(reinterpret_cast(serialized_buffer.c_str()), - serialized_buffer.length(), Compression::UNCOMPRESSED); + // check header size is between 256 KB to 16 MB + ASSERT_LE(stats_size, out_stream_->Tell()); + ASSERT_GE(DEFAULT_MAX_PAGE_HEADER_SIZE, out_stream_->Tell()); + InitSerializedPageReader(); std::shared_ptr current_page = page_reader_->NextPage(); - ASSERT_EQ(PageType::DATA_PAGE, current_page->type()); - const DataPage* page = static_cast(current_page.get()); - ASSERT_EQ(num_values, page->num_values()); + CheckDataPageHeader(data_page_header_, current_page.get()); } -TEST_F(TestSerializedPage, TestFailLargePageHeaders) { - parquet::PageHeader in_page_header; - parquet::DataPageHeader data_page_header; - parquet::PageHeader out_page_header; - parquet::Statistics stats; - int expected_header_size = 512 * 1024; // 512 KB +TEST_F(TestPageSerde, TestFailLargePageHeaders) { int stats_size = 256 * 1024; // 256 KB - int max_header_size = 128 * 1024; // 128 KB - int num_values = 4141; - std::string serialized_buffer; - - InitStats(stats_size, stats); - InitDataPage(stats, data_page_header, num_values); - InitPageHeader(data_page_header, in_page_header); + AddDummyStats(stats_size, data_page_header_); // Serialize the Page header - ASSERT_NO_THROW(serialized_buffer = SerializeThriftMsg(&in_page_header, - expected_header_size)); - // check header size is between 256 KB to 16 MB - ASSERT_LE(stats_size, serialized_buffer.length()); - ASSERT_GE(DEFAULT_MAX_PAGE_HEADER_SIZE, serialized_buffer.length()); + int max_header_size = 512 * 1024; // 512 KB + WriteDataPageHeader(max_header_size); + ASSERT_GE(max_header_size, out_stream_->Tell()); - InitSerializedPageReader(reinterpret_cast(serialized_buffer.c_str()), - serialized_buffer.length(), Compression::UNCOMPRESSED); - - // Set the max page header size to 128 KB, which is less than the current header size - page_reader_->set_max_page_header_size(max_header_size); + int smaller_max_size = 128 * 1024; + ASSERT_LE(smaller_max_size, out_stream_->Tell()); + InitSerializedPageReader(); + // Set the max page header size to 128 KB, which is less than the current + // header size + page_reader_->set_max_page_header_size(smaller_max_size); ASSERT_THROW(page_reader_->NextPage(), ParquetException); } + +TEST_F(TestPageSerde, Compression) { + Compression::type codec_types[2] = {Compression::GZIP, Compression::SNAPPY}; + + // This is a dummy number + data_page_header_.num_values = 32; + + int num_pages = 10; + + std::vector > faux_data; + faux_data.resize(num_pages); + for (int i = 0; i < num_pages; ++i) { + // The pages keep getting larger + int page_size = (i + 1) * 64; + test::random_bytes(page_size, 0, &faux_data[i]); + } + for (auto codec_type : codec_types) { + std::unique_ptr codec = Codec::Create(codec_type); + + std::vector buffer; + for (int i = 0; i < num_pages; ++i) { + const uint8_t* data = faux_data[i].data(); + size_t data_size = faux_data[i].size(); + + int64_t max_compressed_size = codec->MaxCompressedLen(data_size, data); + buffer.resize(max_compressed_size); + + int64_t actual_size = codec->Compress(data_size, data, + max_compressed_size, &buffer[0]); + + WriteDataPageHeader(1024, data_size, actual_size); + out_stream_->Write(buffer.data(), actual_size); + } + + InitSerializedPageReader(codec_type); + + std::shared_ptr page; + const DataPage* data_page; + for (int i = 0; i < num_pages; ++i) { + size_t data_size = faux_data[i].size(); + page = page_reader_->NextPage(); + data_page = static_cast(page.get()); + ASSERT_EQ(data_size, data_page->size()); + ASSERT_EQ(0, memcmp(faux_data[i].data(), data_page->data(), data_size)); + } + + ResetStream(); + } +} + +TEST_F(TestPageSerde, LZONotSupported) { + // Must await PARQUET-530 + int data_size = 1024; + std::vector faux_data(data_size); + WriteDataPageHeader(1024, data_size, data_size); + out_stream_->Write(faux_data.data(), data_size); + ASSERT_THROW(InitSerializedPageReader(Compression::LZO), ParquetException); +} + } // namespace parquet_cpp diff --git a/cpp/src/parquet/file/reader-internal.cc b/cpp/src/parquet/file/reader-internal.cc index 47092a5fc0a80..0a93b00932278 100644 --- a/cpp/src/parquet/file/reader-internal.cc +++ b/cpp/src/parquet/file/reader-internal.cc @@ -21,6 +21,7 @@ #include #include #include +#include #include #include "parquet/column/page.h" @@ -40,22 +41,10 @@ namespace parquet_cpp { // assembled in a serialized stream for storing in a Parquet files SerializedPageReader::SerializedPageReader(std::unique_ptr stream, - Compression::type codec) : + Compression::type codec_type) : stream_(std::move(stream)) { max_page_header_size_ = DEFAULT_MAX_PAGE_HEADER_SIZE; - // TODO(wesm): add GZIP after PARQUET-456 - switch (codec) { - case Compression::UNCOMPRESSED: - break; - case Compression::SNAPPY: - decompressor_.reset(new SnappyCodec()); - break; - case Compression::LZO: - decompressor_.reset(new Lz4Codec()); - break; - default: - ParquetException::NYI("Reading compressed data"); - } + decompressor_ = Codec::Create(codec_type); } std::shared_ptr SerializedPageReader::NextPage() { @@ -126,11 +115,22 @@ std::shared_ptr SerializedPageReader::NextPage() { } else if (current_page_header_.type == parquet::PageType::DATA_PAGE) { const parquet::DataPageHeader& header = current_page_header_.data_page_header; - return std::make_shared(buffer, uncompressed_len, + auto page = std::make_shared(buffer, uncompressed_len, header.num_values, FromThrift(header.encoding), FromThrift(header.definition_level_encoding), FromThrift(header.repetition_level_encoding)); + + if (header.__isset.statistics) { + const parquet::Statistics stats = header.statistics; + if (stats.__isset.max) { + page->max_ = stats.max; + } + if (stats.__isset.min) { + page->min_ = stats.min; + } + } + return page; } else if (current_page_header_.type == parquet::PageType::DATA_PAGE_V2) { const parquet::DataPageHeaderV2& header = current_page_header_.data_page_header_v2; bool is_compressed = header.__isset.is_compressed? header.is_compressed : false; diff --git a/cpp/src/parquet/schema/schema-descriptor-test.cc b/cpp/src/parquet/schema/schema-descriptor-test.cc index c63df54466b93..83b136d49693a 100644 --- a/cpp/src/parquet/schema/schema-descriptor-test.cc +++ b/cpp/src/parquet/schema/schema-descriptor-test.cc @@ -27,6 +27,7 @@ #include "parquet/exception.h" #include "parquet/schema/descriptor.h" #include "parquet/schema/types.h" +#include "parquet/types.h" using std::string; using std::vector; diff --git a/cpp/src/parquet/thrift/CMakeLists.txt b/cpp/src/parquet/thrift/CMakeLists.txt index 29b8ef89d3b7a..f43c2a57f7118 100644 --- a/cpp/src/parquet/thrift/CMakeLists.txt +++ b/cpp/src/parquet/thrift/CMakeLists.txt @@ -44,5 +44,3 @@ add_custom_command( COMMENT "Running thrift compiler on parquet.thrift" VERBATIM ) - -ADD_PARQUET_TEST(serializer-test) diff --git a/cpp/src/parquet/thrift/serializer-test.cc b/cpp/src/parquet/thrift/serializer-test.cc deleted file mode 100644 index 756fd10000a79..0000000000000 --- a/cpp/src/parquet/thrift/serializer-test.cc +++ /dev/null @@ -1,75 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include - -#include -#include -#include - -#include "parquet/column/test-util.h" -#include "parquet/thrift/parquet_types.h" -#include "parquet/thrift/util.h" - -using std::string; - -namespace parquet_cpp { - -class TestThrift : public ::testing::Test { - -}; - -TEST_F(TestThrift, TestSerializerDeserializer) { - parquet::PageHeader in_page_header; - parquet::DataPageHeader data_page_header; - parquet::PageHeader out_page_header; - parquet::Statistics stats; - uint32_t max_header_len = 1024; - uint32_t expected_header_size = 1024; - uint32_t stats_size = 512; - std::string serialized_buffer; - int num_values = 4444; - - InitStats(stats_size, stats); - InitDataPage(stats, data_page_header, num_values); - InitPageHeader(data_page_header, in_page_header); - - // Serialize the Page header - ASSERT_NO_THROW(serialized_buffer = SerializeThriftMsg(&in_page_header, expected_header_size)); - ASSERT_LE(stats_size, serialized_buffer.length()); - ASSERT_GE(max_header_len, serialized_buffer.length()); - - uint32_t header_size = 1024; - // Deserialize the serialized page buffer - ASSERT_NO_THROW(DeserializeThriftMsg(reinterpret_cast(serialized_buffer.c_str()), - &header_size, &out_page_header)); - ASSERT_LE(stats_size, header_size); - ASSERT_GE(max_header_len, header_size); - - ASSERT_EQ(parquet::Encoding::PLAIN, out_page_header.data_page_header.encoding); - ASSERT_EQ(parquet::Encoding::RLE, out_page_header.data_page_header.definition_level_encoding); - ASSERT_EQ(parquet::Encoding::RLE, out_page_header.data_page_header.repetition_level_encoding); - for(int i = 0; i < stats_size; i++){ - EXPECT_EQ(i % 255, (reinterpret_cast - (out_page_header.data_page_header.statistics.max.c_str()))[i]); - } - ASSERT_EQ(parquet::PageType::DATA_PAGE, out_page_header.type); - ASSERT_EQ(num_values, out_page_header.data_page_header.num_values); - -} - -} // namespace parquet_cpp diff --git a/cpp/src/parquet/thrift/util.h b/cpp/src/parquet/thrift/util.h index 8c3419745863b..5f2982052011d 100644 --- a/cpp/src/parquet/thrift/util.h +++ b/cpp/src/parquet/thrift/util.h @@ -18,8 +18,9 @@ #include #include "parquet/exception.h" -#include "parquet/util/logging.h" #include "parquet/thrift/parquet_types.h" +#include "parquet/util/logging.h" +#include "parquet/util/output.h" namespace parquet_cpp { @@ -77,7 +78,7 @@ inline void DeserializeThriftMsg(const uint8_t* buf, uint32_t* len, T* deseriali // The arguments are the object to be serialized and // the expected size of the serialized object template -inline std::string SerializeThriftMsg(T* obj, uint32_t len) { +inline void SerializeThriftMsg(T* obj, uint32_t len, OutputStream* out) { boost::shared_ptr mem_buffer( new apache::thrift::transport::TMemoryBuffer(len)); apache::thrift::protocol::TCompactProtocolFactoryT< @@ -92,7 +93,11 @@ inline std::string SerializeThriftMsg(T* obj, uint32_t len) { ss << "Couldn't serialize thrift: " << e.what() << "\n"; throw ParquetException(ss.str()); } - return mem_buffer->getBufferAsString(); + + uint8_t* out_buffer; + uint32_t out_length; + mem_buffer->getBuffer(&out_buffer, &out_length); + out->Write(out_buffer, out_length); } } // namespace parquet_cpp diff --git a/cpp/src/parquet/util/macros.h b/cpp/src/parquet/util/macros.h index 7b301d67c51c7..d2211732076ef 100644 --- a/cpp/src/parquet/util/macros.h +++ b/cpp/src/parquet/util/macros.h @@ -20,6 +20,11 @@ // Useful macros from elsewhere +// From Google gutil +#define DISALLOW_COPY_AND_ASSIGN(TypeName) \ + TypeName(const TypeName&) = delete; \ + void operator=(const TypeName&) = delete + // ---------------------------------------------------------------------- // From googletest diff --git a/cpp/src/parquet/util/output.h b/cpp/src/parquet/util/output.h index be25abd9b273f..68a09e28bd1f7 100644 --- a/cpp/src/parquet/util/output.h +++ b/cpp/src/parquet/util/output.h @@ -21,6 +21,8 @@ #include #include +#include "parquet/util/macros.h" + namespace parquet_cpp { // ---------------------------------------------------------------------- @@ -63,6 +65,8 @@ class InMemoryOutputStream : public OutputStream { std::vector buffer_; int64_t size_; int64_t capacity_; + + DISALLOW_COPY_AND_ASSIGN(InMemoryOutputStream); }; } // namespace parquet_cpp