Skip to content

Commit

Permalink
Minor enhancement DataPage read/write
Browse files Browse the repository at this point in the history
  • Loading branch information
mapleFU committed May 20, 2024
1 parent dcdf4e6 commit cca042a
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 19 deletions.
13 changes: 7 additions & 6 deletions cpp/src/parquet/column_page.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include <memory>
#include <optional>
#include <string>
#include <utility>

#include "parquet/statistics.h"
#include "parquet/types.h"
Expand Down Expand Up @@ -75,13 +76,13 @@ class DataPage : public Page {
protected:
DataPage(PageType::type type, const std::shared_ptr<Buffer>& buffer, int32_t num_values,
Encoding::type encoding, int64_t uncompressed_size,
const EncodedStatistics& statistics = EncodedStatistics(),
EncodedStatistics statistics = EncodedStatistics(),
std::optional<int64_t> first_row_index = std::nullopt)
: Page(buffer, type),
num_values_(num_values),
encoding_(encoding),
uncompressed_size_(uncompressed_size),
statistics_(statistics),
statistics_(std::move(statistics)),
first_row_index_(std::move(first_row_index)) {}

int32_t num_values_;
Expand All @@ -97,10 +98,10 @@ class DataPageV1 : public DataPage {
DataPageV1(const std::shared_ptr<Buffer>& buffer, int32_t num_values,
Encoding::type encoding, Encoding::type definition_level_encoding,
Encoding::type repetition_level_encoding, int64_t uncompressed_size,
const EncodedStatistics& statistics = EncodedStatistics(),
EncodedStatistics statistics = EncodedStatistics(),
std::optional<int64_t> first_row_index = std::nullopt)
: DataPage(PageType::DATA_PAGE, buffer, num_values, encoding, uncompressed_size,
statistics, std::move(first_row_index)),
std::move(statistics), std::move(first_row_index)),
definition_level_encoding_(definition_level_encoding),
repetition_level_encoding_(repetition_level_encoding) {}

Expand All @@ -119,10 +120,10 @@ class DataPageV2 : public DataPage {
int32_t num_rows, Encoding::type encoding,
int32_t definition_levels_byte_length, int32_t repetition_levels_byte_length,
int64_t uncompressed_size, bool is_compressed = false,
const EncodedStatistics& statistics = EncodedStatistics(),
EncodedStatistics statistics = EncodedStatistics(),
std::optional<int64_t> first_row_index = std::nullopt)
: DataPage(PageType::DATA_PAGE_V2, buffer, num_values, encoding, uncompressed_size,
statistics, std::move(first_row_index)),
std::move(statistics), std::move(first_row_index)),
num_nulls_(num_nulls),
num_rows_(num_rows),
definition_levels_byte_length_(definition_levels_byte_length),
Expand Down
12 changes: 6 additions & 6 deletions cpp/src/parquet/column_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -534,11 +534,11 @@ std::shared_ptr<Page> SerializedPageReader::NextPage() {
page_buffer =
DecompressIfNeeded(std::move(page_buffer), compressed_len, uncompressed_len);

return std::make_shared<DataPageV1>(page_buffer, header.num_values,
LoadEnumSafe(&header.encoding),
LoadEnumSafe(&header.definition_level_encoding),
LoadEnumSafe(&header.repetition_level_encoding),
uncompressed_len, data_page_statistics);
return std::make_shared<DataPageV1>(
page_buffer, header.num_values, LoadEnumSafe(&header.encoding),
LoadEnumSafe(&header.definition_level_encoding),
LoadEnumSafe(&header.repetition_level_encoding), uncompressed_len,
std::move(data_page_statistics));
} else if (page_type == PageType::DATA_PAGE_V2) {
++page_ordinal_;
const format::DataPageHeaderV2& header = current_page_header_.data_page_header_v2;
Expand All @@ -565,7 +565,7 @@ std::shared_ptr<Page> SerializedPageReader::NextPage() {
page_buffer, header.num_values, header.num_nulls, header.num_rows,
LoadEnumSafe(&header.encoding), header.definition_levels_byte_length,
header.repetition_levels_byte_length, uncompressed_len, is_compressed,
data_page_statistics);
std::move(data_page_statistics));
} else {
throw ParquetException(
"Internal error, we have already skipped non-data pages in ShouldSkipPage()");
Expand Down
14 changes: 7 additions & 7 deletions cpp/src/parquet/column_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,11 @@ class SerializedPageWriter : public PageWriter {

int64_t WriteDataPage(const DataPage& page) override {
const int64_t uncompressed_size = page.uncompressed_size();
if (uncompressed_size > std::numeric_limits<int32_t>::max()) {
throw ParquetException("Uncompressed data page size overflows INT32_MAX. Size:",
uncompressed_size);
}

std::shared_ptr<Buffer> compressed_data = page.buffer();
const uint8_t* output_data_buffer = compressed_data->data();
int64_t output_data_len = compressed_data->size();
Expand All @@ -399,11 +404,6 @@ class SerializedPageWriter : public PageWriter {
}

format::PageHeader page_header;

if (uncompressed_size > std::numeric_limits<int32_t>::max()) {
throw ParquetException("Uncompressed data page size overflows INT32_MAX. Size:",
uncompressed_size);
}
page_header.__set_uncompressed_page_size(static_cast<int32_t>(uncompressed_size));
page_header.__set_compressed_page_size(static_cast<int32_t>(output_data_len));

Expand Down Expand Up @@ -1018,13 +1018,13 @@ void ColumnWriterImpl::BuildDataPageV1(int64_t definition_levels_rle_size,
compressed_data->CopySlice(0, compressed_data->size(), allocator_));
std::unique_ptr<DataPage> page_ptr = std::make_unique<DataPageV1>(
compressed_data_copy, num_values, encoding_, Encoding::RLE, Encoding::RLE,
uncompressed_size, page_stats, first_row_index);
uncompressed_size, std::move(page_stats), first_row_index);
total_compressed_bytes_ += page_ptr->size() + sizeof(format::PageHeader);

data_pages_.push_back(std::move(page_ptr));
} else { // Eagerly write pages
DataPageV1 page(compressed_data, num_values, encoding_, Encoding::RLE, Encoding::RLE,
uncompressed_size, page_stats, first_row_index);
uncompressed_size, std::move(page_stats), first_row_index);
WriteDataPage(page);
}
}
Expand Down

0 comments on commit cca042a

Please sign in to comment.