From 50c8c16281623ece516f7d87cba771b89e91954d Mon Sep 17 00:00:00 2001 From: Deepak Majeti Date: Fri, 16 Sep 2016 09:01:00 +0200 Subject: [PATCH] PARQUET-689: C++: Compress DataPages eagerly Author: Deepak Majeti Closes #162 from majetideepak/PARQUET-689 and squashes the following commits: 46f04fb [Deepak Majeti] Clang format 73dfcf9 [Deepak Majeti] Compress Data Pages early Change-Id: I2a1067f5692458aa52fcf0656200a014c75505ba --- cpp/src/parquet/column/page.h | 19 ++++++++++++++++++- cpp/src/parquet/column/writer.cc | 8 +++++--- cpp/src/parquet/column/writer.h | 4 ++-- cpp/src/parquet/file/writer-internal.cc | 6 +++--- cpp/src/parquet/file/writer-internal.h | 18 +++++++++--------- 5 files changed, 37 insertions(+), 18 deletions(-) diff --git a/cpp/src/parquet/column/page.h b/cpp/src/parquet/column/page.h index c06d3de9a93ae..1de60131d2402 100644 --- a/cpp/src/parquet/column/page.h +++ b/cpp/src/parquet/column/page.h @@ -95,6 +95,21 @@ class DataPage : public Page { std::string min_; }; +class CompressedDataPage : public DataPage { + public: + CompressedDataPage(const std::shared_ptr& buffer, int32_t num_values, + Encoding::type encoding, Encoding::type definition_level_encoding, + Encoding::type repetition_level_encoding, int64_t uncompressed_size) + : DataPage(buffer, num_values, encoding, definition_level_encoding, + repetition_level_encoding), + uncompressed_size_(uncompressed_size) {} + + int64_t uncompressed_size() const { return uncompressed_size_; } + + private: + int64_t uncompressed_size_; +}; + class DataPageV2 : public Page { public: DataPageV2(const std::shared_ptr& buffer, int32_t num_values, int32_t num_nulls, @@ -176,9 +191,11 @@ class PageWriter { // page limit virtual void Close(bool has_dictionary, bool fallback) = 0; - virtual int64_t WriteDataPage(const DataPage& page) = 0; + virtual int64_t WriteDataPage(const CompressedDataPage& page) = 0; virtual int64_t WriteDictionaryPage(const DictionaryPage& page) = 0; + + virtual std::shared_ptr Compress(const std::shared_ptr& buffer) = 0; }; } // namespace parquet diff --git a/cpp/src/parquet/column/writer.cc b/cpp/src/parquet/column/writer.cc index 1fbea62d81afc..bfbd0c5d24919 100644 --- a/cpp/src/parquet/column/writer.cc +++ b/cpp/src/parquet/column/writer.cc @@ -116,8 +116,10 @@ void ColumnWriter::AddDataPage() { memcpy(uncompressed_ptr, definition_levels->data(), definition_levels->size()); uncompressed_ptr += definition_levels->size(); memcpy(uncompressed_ptr, values->data(), values->size()); - DataPage page( - uncompressed_data, num_buffered_values_, encoding_, Encoding::RLE, Encoding::RLE); + + std::shared_ptr compressed_data = pager_->Compress(uncompressed_data); + CompressedDataPage page(compressed_data, num_buffered_values_, encoding_, Encoding::RLE, + Encoding::RLE, uncompressed_size); // Write the page to OutputStream eagerly if there is no dictionary or // if dictionary encoding has fallen back to PLAIN @@ -133,7 +135,7 @@ void ColumnWriter::AddDataPage() { num_buffered_encoded_values_ = 0; } -void ColumnWriter::WriteDataPage(const DataPage& page) { +void ColumnWriter::WriteDataPage(const CompressedDataPage& page) { int64_t bytes_written = pager_->WriteDataPage(page); total_bytes_written_ += bytes_written; } diff --git a/cpp/src/parquet/column/writer.h b/cpp/src/parquet/column/writer.h index 4b2a02192ee0c..3a54cbb6ab814 100644 --- a/cpp/src/parquet/column/writer.h +++ b/cpp/src/parquet/column/writer.h @@ -72,7 +72,7 @@ class PARQUET_EXPORT ColumnWriter { void AddDataPage(); // Serializes Data Pages - void WriteDataPage(const DataPage& page); + void WriteDataPage(const CompressedDataPage& page); // Write multiple definition levels void WriteDefinitionLevels(int64_t num_levels, const int16_t* levels); @@ -128,7 +128,7 @@ class PARQUET_EXPORT ColumnWriter { std::unique_ptr definition_levels_sink_; std::unique_ptr repetition_levels_sink_; - std::vector data_pages_; + std::vector data_pages_; private: void InitSinks(); diff --git a/cpp/src/parquet/file/writer-internal.cc b/cpp/src/parquet/file/writer-internal.cc index 2d396b7f6de07..05aefb9554d3e 100644 --- a/cpp/src/parquet/file/writer-internal.cc +++ b/cpp/src/parquet/file/writer-internal.cc @@ -66,9 +66,9 @@ std::shared_ptr SerializedPageWriter::Compress( return compression_buffer_; } -int64_t SerializedPageWriter::WriteDataPage(const DataPage& page) { - int64_t uncompressed_size = page.size(); - std::shared_ptr compressed_data = Compress(page.buffer()); +int64_t SerializedPageWriter::WriteDataPage(const CompressedDataPage& page) { + int64_t uncompressed_size = page.uncompressed_size(); + std::shared_ptr compressed_data = page.buffer(); format::DataPageHeader data_page_header; data_page_header.__set_num_values(page.num_values()); diff --git a/cpp/src/parquet/file/writer-internal.h b/cpp/src/parquet/file/writer-internal.h index e6364e9de477c..2095154a074e0 100644 --- a/cpp/src/parquet/file/writer-internal.h +++ b/cpp/src/parquet/file/writer-internal.h @@ -40,10 +40,18 @@ class SerializedPageWriter : public PageWriter { virtual ~SerializedPageWriter() {} - int64_t WriteDataPage(const DataPage& page) override; + int64_t WriteDataPage(const CompressedDataPage& page) override; int64_t WriteDictionaryPage(const DictionaryPage& page) override; + /** + * Compress a buffer. + * + * This method may return compression_buffer_ and thus the resulting memory + * is only valid until the next call to Compress(). + */ + std::shared_ptr Compress(const std::shared_ptr& buffer) override; + void Close(bool has_dictionary, bool fallback) override; private: @@ -58,14 +66,6 @@ class SerializedPageWriter : public PageWriter { // Compression codec to use. std::unique_ptr compressor_; std::shared_ptr compression_buffer_; - - /** - * Compress a buffer. - * - * This method may return compression_buffer_ and thus the resulting memory - * is only valid until the next call to Compress(). - */ - std::shared_ptr Compress(const std::shared_ptr& buffer); }; // RowGroupWriter::Contents implementation for the Parquet file specification