diff --git a/cpp/src/parquet/arrow/record_reader.cc b/cpp/src/parquet/arrow/record_reader.cc index 4a988dacdd9aa..a8a2270b8b7bf 100644 --- a/cpp/src/parquet/arrow/record_reader.cc +++ b/cpp/src/parquet/arrow/record_reader.cc @@ -570,6 +570,16 @@ class TypedRecordReader : public RecordReader::RecordReaderImpl { DecoderType* current_decoder_; + // Initialize repetition and definition level decoders on the next data page. + template + int64_t InitializeLevelDecoders(const std::shared_ptr page, + const Encoding::type repetition_level_encoding, + const Encoding::type definition_level_encoding); + + template + void InitializeDataDecoder(const std::shared_ptr page, + const int64_t levels_bytes); + // Advance to the next data page bool ReadNewPage() override; @@ -724,11 +734,95 @@ inline void TypedRecordReader::ConfigureDictionary(const DictionaryPage* current_decoder_ = decoders_[encoding].get(); } +// If the data page includes repetition and definition levels, we +// initialize the level decoders and return the number of encoded level bytes. +// The return value helps determine the number of bytes in the encoded data. +template +template +int64_t TypedRecordReader::InitializeLevelDecoders( + const std::shared_ptr page, const Encoding::type repetition_level_encoding, + const Encoding::type definition_level_encoding) { + // Read a data page. + num_buffered_values_ = page->num_values(); + + // Have not decoded any values from the data page yet + num_decoded_values_ = 0; + + const uint8_t* buffer = page->data(); + int64_t levels_byte_size = 0; + + // Data page Layout: Repetition Levels - Definition Levels - encoded values. + // Levels are encoded as rle or bit-packed. + // Init repetition levels + if (descr_->max_repetition_level() > 0) { + int64_t rep_levels_bytes = repetition_level_decoder_.SetData( + repetition_level_encoding, descr_->max_repetition_level(), + static_cast(num_buffered_values_), buffer); + buffer += rep_levels_bytes; + levels_byte_size += rep_levels_bytes; + } + // TODO figure a way to set max_definition_level_ to 0 + // if the initial value is invalid + + // Init definition levels + if (descr_->max_definition_level() > 0) { + int64_t def_levels_bytes = definition_level_decoder_.SetData( + definition_level_encoding, descr_->max_definition_level(), + static_cast(num_buffered_values_), buffer); + levels_byte_size += def_levels_bytes; + } + + return levels_byte_size; +} + +// Get a decoder object for this page or create a new decoder if this is the +// first page with this encoding. +template +template +void TypedRecordReader::InitializeDataDecoder(const std::shared_ptr page, + const int64_t levels_byte_size) { + const uint8_t* buffer = page->data() + levels_byte_size; + const int64_t data_size = page->size() - levels_byte_size; + + Encoding::type encoding = page->encoding(); + + if (IsDictionaryIndexEncoding(encoding)) { + encoding = Encoding::RLE_DICTIONARY; + } + + auto it = decoders_.find(static_cast(encoding)); + if (it != decoders_.end()) { + if (encoding == Encoding::RLE_DICTIONARY) { + DCHECK(current_decoder_->encoding() == Encoding::RLE_DICTIONARY); + } + current_decoder_ = it->second.get(); + } else { + switch (encoding) { + case Encoding::PLAIN: { + std::shared_ptr decoder(new PlainDecoder(descr_)); + decoders_[static_cast(encoding)] = decoder; + current_decoder_ = decoder.get(); + break; + } + case Encoding::RLE_DICTIONARY: + throw ParquetException("Dictionary page must be before data page."); + + case Encoding::DELTA_BINARY_PACKED: + case Encoding::DELTA_LENGTH_BYTE_ARRAY: + case Encoding::DELTA_BYTE_ARRAY: + ParquetException::NYI("Unsupported encoding"); + + default: + throw ParquetException("Unknown encoding type."); + } + } + current_decoder_->SetData(static_cast(num_buffered_values_), buffer, + static_cast(data_size)); +} + template bool TypedRecordReader::ReadNewPage() { // Loop until we find the next data page. - const uint8_t* buffer; - while (true) { current_page_ = pager_->NextPage(); if (!current_page_) { @@ -740,79 +834,18 @@ bool TypedRecordReader::ReadNewPage() { ConfigureDictionary(static_cast(current_page_.get())); continue; } else if (current_page_->type() == PageType::DATA_PAGE) { - const DataPage* page = static_cast(current_page_.get()); - - // Read a data page. - num_buffered_values_ = page->num_values(); - - // Have not decoded any values from the data page yet - num_decoded_values_ = 0; - - buffer = page->data(); - - // If the data page includes repetition and definition levels, we - // initialize the level decoder and subtract the encoded level bytes from - // the page size to determine the number of bytes in the encoded data. - int64_t data_size = page->size(); - - // Data page Layout: Repetition Levels - Definition Levels - encoded values. - // Levels are encoded as rle or bit-packed. - // Init repetition levels - if (descr_->max_repetition_level() > 0) { - int64_t rep_levels_bytes = repetition_level_decoder_.SetData( - page->repetition_level_encoding(), descr_->max_repetition_level(), - static_cast(num_buffered_values_), buffer); - buffer += rep_levels_bytes; - data_size -= rep_levels_bytes; - } - // TODO figure a way to set max_definition_level_ to 0 - // if the initial value is invalid - - // Init definition levels - if (descr_->max_definition_level() > 0) { - int64_t def_levels_bytes = definition_level_decoder_.SetData( - page->definition_level_encoding(), descr_->max_definition_level(), - static_cast(num_buffered_values_), buffer); - buffer += def_levels_bytes; - data_size -= def_levels_bytes; - } - - // Get a decoder object for this page or create a new decoder if this is the - // first page with this encoding. - Encoding::type encoding = page->encoding(); - - if (IsDictionaryIndexEncoding(encoding)) { - encoding = Encoding::RLE_DICTIONARY; - } - - auto it = decoders_.find(static_cast(encoding)); - if (it != decoders_.end()) { - if (encoding == Encoding::RLE_DICTIONARY) { - DCHECK(current_decoder_->encoding() == Encoding::RLE_DICTIONARY); - } - current_decoder_ = it->second.get(); - } else { - switch (encoding) { - case Encoding::PLAIN: { - std::shared_ptr decoder(new PlainDecoder(descr_)); - decoders_[static_cast(encoding)] = decoder; - current_decoder_ = decoder.get(); - break; - } - case Encoding::RLE_DICTIONARY: - throw ParquetException("Dictionary page must be before data page."); - - case Encoding::DELTA_BINARY_PACKED: - case Encoding::DELTA_LENGTH_BYTE_ARRAY: - case Encoding::DELTA_BYTE_ARRAY: - ParquetException::NYI("Unsupported encoding"); - - default: - throw ParquetException("Unknown encoding type."); - } - } - current_decoder_->SetData(static_cast(num_buffered_values_), buffer, - static_cast(data_size)); + const auto page = std::static_pointer_cast(current_page_); + const int64_t levels_byte_size = InitializeLevelDecoders( + page, page->repetition_level_encoding(), page->definition_level_encoding()); + InitializeDataDecoder(page, levels_byte_size); + return true; + } else if (current_page_->type() == PageType::DATA_PAGE_V2) { + const auto page = std::static_pointer_cast(current_page_); + // Repetition and definition levels are always encoded using RLE encoding + // in the DataPageV2 format. + const int64_t levels_byte_size = + InitializeLevelDecoders(page, Encoding::RLE, Encoding::RLE); + InitializeDataDecoder(page, levels_byte_size); return true; } else { // We don't know what this page type is. We're allowed to skip non-data