Skip to content

Commit

Permalink
PARQUET-505: Column reader should automatically handle large data pages
Browse files Browse the repository at this point in the history
This PR implements
1) PARQUET-505: Column reader should automatically handle large data pages
2) Adds support for Serialization
3) Test case for Serialization and Deserialization
4) Test case for SerializedPageReader and PARQUET-505

Author: Deepak Majeti <deepak.majeti@hp.com>

Closes apache#44 from majetideepak/PARQUET-505 and squashes the following commits:

4f754ba [Deepak Majeti] changed type of page header size defaults
4345812 [Deepak Majeti] PARQUET-505: Column reader should automatically handle large data pages

Change-Id: I1b28296cabc0e6e776271b82ed58e267483911d3
  • Loading branch information
Deepak Majeti authored and julienledem committed Feb 11, 2016
1 parent aa16d6c commit 9466d8c
Show file tree
Hide file tree
Showing 8 changed files with 284 additions and 14 deletions.
1 change: 1 addition & 0 deletions cpp/src/parquet/column/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,4 @@ install(FILES

ADD_PARQUET_TEST(column-reader-test)
ADD_PARQUET_TEST(levels-test)
ADD_PARQUET_TEST(serialized-page-test)
109 changes: 109 additions & 0 deletions cpp/src/parquet/column/serialized-page-test.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include <cstdlib>
#include <iostream>
#include <sstream>
#include <string>

#include <gtest/gtest.h>

#include "parquet/types.h"
#include "parquet/thrift/parquet_types.h"
#include "parquet/thrift/util.h"
#include "parquet/column/serialized-page.h"
#include "parquet/column/page.h"
#include "parquet/column/reader.h"
#include "parquet/column/test-util.h"


namespace parquet_cpp {

class TestSerializedPage : public ::testing::Test {
public:
void InitSerializedPageReader(const uint8_t* buffer, size_t header_size,
parquet::CompressionCodec::type codec) {
std::unique_ptr<InputStream> stream;
stream.reset(new InMemoryInputStream(buffer, header_size));
page_reader_.reset(new SerializedPageReader(std::move(stream), codec));
}

protected:
std::unique_ptr<SerializedPageReader> page_reader_;
};

TEST_F(TestSerializedPage, TestLargePageHeaders) {
parquet::PageHeader in_page_header;
parquet::DataPageHeader data_page_header;
parquet::PageHeader out_page_header;
parquet::Statistics stats;
int expected_header_size = 512 * 1024; //512 KB
int stats_size = 256 * 1024; // 256 KB
std::string serialized_buffer;
int num_values = 4141;

InitStats(stats_size, stats);
InitDataPage(stats, data_page_header, num_values);
InitPageHeader(data_page_header, in_page_header);

// Serialize the Page header
ASSERT_NO_THROW(serialized_buffer = SerializeThriftMsg(&in_page_header,
expected_header_size));
// check header size is between 256 KB to 16 MB
ASSERT_LE(stats_size, serialized_buffer.length());
ASSERT_GE(DEFAULT_MAX_PAGE_HEADER_SIZE, serialized_buffer.length());

InitSerializedPageReader(reinterpret_cast<const uint8_t*>(serialized_buffer.c_str()),
serialized_buffer.length(), parquet::CompressionCodec::UNCOMPRESSED);

std::shared_ptr<Page> current_page = page_reader_->NextPage();
ASSERT_EQ(parquet::PageType::DATA_PAGE, current_page->type());
const DataPage* page = static_cast<const DataPage*>(current_page.get());
ASSERT_EQ(num_values, page->num_values());
}

TEST_F(TestSerializedPage, TestFailLargePageHeaders) {
parquet::PageHeader in_page_header;
parquet::DataPageHeader data_page_header;
parquet::PageHeader out_page_header;
parquet::Statistics stats;
int expected_header_size = 512 * 1024; // 512 KB
int stats_size = 256 * 1024; // 256 KB
int max_header_size = 128 * 1024; // 128 KB
int num_values = 4141;
std::string serialized_buffer;

InitStats(stats_size, stats);
InitDataPage(stats, data_page_header, num_values);
InitPageHeader(data_page_header, in_page_header);

// Serialize the Page header
ASSERT_NO_THROW(serialized_buffer = SerializeThriftMsg(&in_page_header,
expected_header_size));
// check header size is between 256 KB to 16 MB
ASSERT_LE(stats_size, serialized_buffer.length());
ASSERT_GE(DEFAULT_MAX_PAGE_HEADER_SIZE, serialized_buffer.length());

InitSerializedPageReader(reinterpret_cast<const uint8_t*>(serialized_buffer.c_str()),
serialized_buffer.length(), parquet::CompressionCodec::UNCOMPRESSED);

// Set the max page header size to 128 KB, which is less than the current header size
page_reader_->set_max_page_header_size(max_header_size);

ASSERT_THROW(page_reader_->NextPage(), ParquetException);
}
} // namespace parquet_cpp
40 changes: 30 additions & 10 deletions cpp/src/parquet/column/serialized-page.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ namespace parquet_cpp {
SerializedPageReader::SerializedPageReader(std::unique_ptr<InputStream> stream,
parquet::CompressionCodec::type codec) :
stream_(std::move(stream)) {
max_page_header_size_ = DEFAULT_MAX_PAGE_HEADER_SIZE;
switch (codec) {
case parquet::CompressionCodec::UNCOMPRESSED:
break;
Expand All @@ -44,23 +45,42 @@ SerializedPageReader::SerializedPageReader(std::unique_ptr<InputStream> stream,
}
}

// TODO(wesm): this may differ from file to file
static constexpr int DATA_PAGE_SIZE = 64 * 1024;

std::shared_ptr<Page> SerializedPageReader::NextPage() {
// Loop here because there may be unhandled page types that we skip until
// finding a page that we do know what to do with
while (true) {
int64_t bytes_read = 0;
const uint8_t* buffer = stream_->Peek(DATA_PAGE_SIZE, &bytes_read);
if (bytes_read == 0) {
return std::shared_ptr<Page>(nullptr);
}

// This gets used, then set by DeserializeThriftMsg
uint32_t header_size = bytes_read;
DeserializeThriftMsg(buffer, &header_size, &current_page_header_);
int64_t bytes_available = 0;
uint32_t header_size = 0;
const uint8_t* buffer;
uint32_t allowed_page_size = DEFAULT_PAGE_HEADER_SIZE;
std::stringstream ss;

// Page headers can be very large because of page statistics
// We try to deserialize a larger buffer progressively
// until a maximum allowed header limit
while (true) {
buffer = stream_->Peek(allowed_page_size, &bytes_available);
if (bytes_available == 0) {
return std::shared_ptr<Page>(nullptr);
}

// This gets used, then set by DeserializeThriftMsg
header_size = bytes_available;
try {
DeserializeThriftMsg(buffer, &header_size, &current_page_header_);
break;
} catch (std::exception& e) {
// Failed to deserialize. Double the allowed page header size and try again
ss << e.what();
allowed_page_size *= 2;
if (allowed_page_size > max_page_header_size_) {
ss << "Deserializing page header failed.\n";
throw ParquetException(ss.str());
}
}
}
// Advance the stream offset
stream_->Read(header_size, &bytes_read);

Expand Down
10 changes: 10 additions & 0 deletions cpp/src/parquet/column/serialized-page.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@

namespace parquet_cpp {

// 16 MB is the default maximum page header size
static constexpr uint32_t DEFAULT_MAX_PAGE_HEADER_SIZE = 16 * 1024 * 1024;
// 16 KB is the default expected page header size
static constexpr uint32_t DEFAULT_PAGE_HEADER_SIZE = 16 * 1024;
// This subclass delimits pages appearing in a serialized stream, each preceded
// by a serialized Thrift parquet::PageHeader indicating the type of each page
// and the page metadata.
Expand All @@ -45,6 +49,10 @@ class SerializedPageReader : public PageReader {
// Implement the PageReader interface
virtual std::shared_ptr<Page> NextPage();

void set_max_page_header_size(uint32_t size) {
max_page_header_size_ = size;
}

private:
std::unique_ptr<InputStream> stream_;

Expand All @@ -54,6 +62,8 @@ class SerializedPageReader : public PageReader {
// Compression codec to use.
std::unique_ptr<Codec> decompressor_;
std::vector<uint8_t> decompression_buffer_;
// Maximum allowed page size
uint32_t max_page_header_size_;
};

} // namespace parquet_cpp
Expand Down
28 changes: 26 additions & 2 deletions cpp/src/parquet/column/test-util.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@
#include <algorithm>
#include <memory>
#include <vector>
#include <string>

#include "parquet/column/page.h"

namespace parquet_cpp {

namespace test {
Expand Down Expand Up @@ -174,9 +174,33 @@ static std::shared_ptr<DataPage> MakeDataPage(const std::vector<T>& values,

return std::make_shared<DataPage>(&(*out_buffer)[0], out_buffer->size(), page_header);
}
} // namespace test

static inline void InitDataPage(const parquet::Statistics& stat,
parquet::DataPageHeader& data_page, int nvalues) {
data_page.encoding = parquet::Encoding::PLAIN;
data_page.definition_level_encoding = parquet::Encoding::RLE;
data_page.repetition_level_encoding = parquet::Encoding::RLE;
data_page.num_values = nvalues;
data_page.__set_statistics(stat);
}

} // namespace test
static inline void InitStats(size_t stat_size, parquet::Statistics& stat) {
std::vector<char> stat_buffer;
stat_buffer.resize(stat_size);
for (int i = 0; i < stat_size; i++) {
(reinterpret_cast<uint8_t*>(stat_buffer.data()))[i] = i % 255;
}
stat.__set_max(std::string(stat_buffer.data(), stat_size));
}

static inline void InitPageHeader(const parquet::DataPageHeader &data_page,
parquet::PageHeader& page_header) {
page_header.__set_data_page_header(data_page);
page_header.uncompressed_page_size = 0;
page_header.compressed_page_size = 0;
page_header.type = parquet::PageType::DATA_PAGE;
}

} // namespace parquet_cpp

Expand Down
2 changes: 2 additions & 0 deletions cpp/src/parquet/thrift/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,5 @@ install(FILES
parquet_constants.h
util.h
DESTINATION include/parquet/thrift)

ADD_PARQUET_TEST(serializer-test)
78 changes: 78 additions & 0 deletions cpp/src/parquet/thrift/serializer-test.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include <cstdlib>
#include <iostream>
#include <sstream>
#include <string>

#include <gtest/gtest.h>

#include "parquet/thrift/parquet_types.h"
#include "parquet/thrift/util.h"
#include "parquet/column/page.h"
#include "parquet/column/reader.h"
#include "parquet/column/test-util.h"

using std::string;

namespace parquet_cpp {

class TestThrift : public ::testing::Test {

};

TEST_F(TestThrift, TestSerializerDeserializer) {
parquet::PageHeader in_page_header;
parquet::DataPageHeader data_page_header;
parquet::PageHeader out_page_header;
parquet::Statistics stats;
uint32_t max_header_len = 1024;
uint32_t expected_header_size = 1024;
uint32_t stats_size = 512;
std::string serialized_buffer;
int num_values = 4444;

InitStats(stats_size, stats);
InitDataPage(stats, data_page_header, num_values);
InitPageHeader(data_page_header, in_page_header);

// Serialize the Page header
ASSERT_NO_THROW(serialized_buffer = SerializeThriftMsg(&in_page_header, expected_header_size));
ASSERT_LE(stats_size, serialized_buffer.length());
ASSERT_GE(max_header_len, serialized_buffer.length());

uint32_t header_size = 1024;
// Deserialize the serialized page buffer
ASSERT_NO_THROW(DeserializeThriftMsg(reinterpret_cast<const uint8_t*>(serialized_buffer.c_str()),
&header_size, &out_page_header));
ASSERT_LE(stats_size, header_size);
ASSERT_GE(max_header_len, header_size);

ASSERT_EQ(parquet::Encoding::PLAIN, out_page_header.data_page_header.encoding);
ASSERT_EQ(parquet::Encoding::RLE, out_page_header.data_page_header.definition_level_encoding);
ASSERT_EQ(parquet::Encoding::RLE, out_page_header.data_page_header.repetition_level_encoding);
for(int i = 0; i < stats_size; i++){
EXPECT_EQ(i % 255, (reinterpret_cast<const uint8_t*>
(out_page_header.data_page_header.statistics.max.c_str()))[i]);
}
ASSERT_EQ(parquet::PageType::DATA_PAGE, out_page_header.type);
ASSERT_EQ(num_values, out_page_header.data_page_header.num_values);

}

} // namespace parquet_cpp
30 changes: 28 additions & 2 deletions cpp/src/parquet/thrift/util.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@

#include <thrift/protocol/TBinaryProtocol.h>
#include <thrift/transport/TBufferTransports.h>
#include <sstream>

#include "parquet/util/logging.h"
#include "parquet/exception.h"

namespace parquet_cpp {
Expand All @@ -34,13 +36,37 @@ inline void DeserializeThriftMsg(const uint8_t* buf, uint32_t* len, T* deseriali
tproto_factory.getProtocol(tmem_transport);
try {
deserialized_msg->read(tproto.get());
} catch (apache::thrift::protocol::TProtocolException& e) {
throw ParquetException("Couldn't deserialize thrift.", e);
} catch (std::exception& e) {
std::stringstream ss;
ss << "Couldn't deserialize thrift: " << e.what() << "\n";
throw ParquetException(ss.str());
}
uint32_t bytes_left = tmem_transport->available_read();
*len = *len - bytes_left;
}

// Serialize obj into a buffer. The result is returned as a string.
// The arguments are the object to be serialized and
// the expected size of the serialized object
template <class T>
inline std::string SerializeThriftMsg(T* obj, uint32_t len) {
boost::shared_ptr<apache::thrift::transport::TMemoryBuffer> mem_buffer(
new apache::thrift::transport::TMemoryBuffer(len));
apache::thrift::protocol::TCompactProtocolFactoryT<
apache::thrift::transport::TMemoryBuffer> tproto_factory;
boost::shared_ptr<apache::thrift::protocol::TProtocol> tproto =
tproto_factory.getProtocol(mem_buffer);
try {
mem_buffer->resetBuffer();
obj->write(tproto.get());
} catch (std::exception& e) {
std::stringstream ss;
ss << "Couldn't serialize thrift: " << e.what() << "\n";
throw ParquetException(ss.str());
}
return mem_buffer->getBufferAsString();
}

} // namespace parquet_cpp

#endif // PARQUET_THRIFT_UTIL_H

0 comments on commit 9466d8c

Please sign in to comment.