Skip to content

Commit

Permalink
PARQUET-435: Change column reader methods to be array-oriented rather…
Browse files Browse the repository at this point in the history
… than scalar

Column scanning and record reconstruction is independent of the Parquet file format and depends, among other things, on the data structures where the reconstructed data will end up. This is a work-in progress, but the basic idea is:

- APIs for reading a batch of repetition `ReadRepetitionLevels` or definition levels `ReadDefinitionLevels` into a preallocated `int16_t*`
- APIs for reading arrays of decoded values into preallocated memory (`ReadValues`)

These methods are only able to read data within a particular data page. Once you exhaust the data available in the data page (`ReadValues` returns 0), you must call `ReadNewPage`, which returns `true` is there is more data available.

Separately, I added a simple `Scanner` class that emulates the scalar value iteration functionality that existed previously. I used this to reimplement the `DebugPrint` method in `parquet_scanner.cc`. This obviously only works currently for flat data.

I would like to keep the `ColumnReader` low level and primitive, concerned only with providing access to the raw data in a Parquet file as fast as possible. We can devise separate algorithms for inferring nested record structure by examining the arrays of decoded values and repetition/definition levels. The major benefit of separating raw data access from structure inference is that this can be pipelined with threads: one thread decompresses and decodes values and levels, and another thread can turn batches into a nested record- or column-oriented structure.

Author: Wes McKinney <wes@cloudera.com>

Closes apache#26 from wesm/PARQUET-435 and squashes the following commits:

4bf5cd4 [Wes McKinney] Fix cpplint
852f4ec [Wes McKinney] Address review comments, also be sure to use Scanner::HasNext
7ea261e [Wes McKinney] Add TODO comment
4999719 [Wes McKinney] Make ColumnReader::ReadNewPage private and call HasNext() in ReadBatch
0d2e111 [Wes McKinney] Fix function description. Change #define to constexpr
111ef13 [Wes McKinney] Incorporate review comments and add some better comments
e16f7fd [Wes McKinney] Typo
ef52404 [Wes McKinney] Fix function doc
5e95cda [Wes McKinney] Configurable scanner batch size. Do not use printf in DebugPrint
1b4eca0 [Wes McKinney] New batch read API which reads levels and values in one shot
de4d6b6 [Wes McKinney] Move column_* files into parquet/column folder
aad4a86 [Wes McKinney] Finish refactoring scanner API with shared pointers
4506748 [Wes McKinney] Refactoring, do not have shared_from_this working yet
6489b15 [Wes McKinney] Batch level/value read interface on ColumnReader. Add Scanner class for flat columns. Add a couple smoke unit tests
  • Loading branch information
wesm committed Sep 2, 2018
1 parent d427af3 commit ab5f61b
Show file tree
Hide file tree
Showing 11 changed files with 561 additions and 149 deletions.
1 change: 0 additions & 1 deletion cpp/src/parquet/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
# Headers: top level
install(FILES
parquet.h
column_reader.h
reader.h
exception.h
types.h
Expand Down
22 changes: 22 additions & 0 deletions cpp/src/parquet/column/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

# Headers: top level
install(FILES
reader.h
scanner.h
DESTINATION include/parquet/column)
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,14 @@
// specific language governing permissions and limitations
// under the License.

#include "parquet/column_reader.h"
#include "parquet/column/reader.h"

#include <algorithm>
#include <string>
#include <string.h>

#include "parquet/encodings/encodings.h"
#include "parquet/compression/codec.h"
#include "parquet/encodings/encodings.h"
#include "parquet/thrift/util.h"
#include "parquet/util/input_stream.h"

Expand All @@ -42,8 +42,7 @@ ColumnReader::ColumnReader(const parquet::ColumnMetaData* metadata,
schema_(schema),
stream_(std::move(stream)),
num_buffered_values_(0),
num_decoded_values_(0),
buffered_values_offset_(0) {
num_decoded_values_(0) {

switch (metadata->codec) {
case CompressionCodec::UNCOMPRESSED:
Expand All @@ -69,7 +68,6 @@ template <int TYPE>
bool TypedColumnReader<TYPE>::ReadNewPage() {
// Loop until we find the next data page.


while (true) {
int bytes_read = 0;
const uint8_t* buffer = stream_->Peek(DATA_PAGE_SIZE, &bytes_read);
Expand Down Expand Up @@ -114,15 +112,25 @@ bool TypedColumnReader<TYPE>::ReadNewPage() {
// Read a data page.
num_buffered_values_ = current_page_header_.data_page_header.num_values;

// Have not decoded any values from the data page yet
num_decoded_values_ = 0;

// Read definition levels.
if (schema_->repetition_type != FieldRepetitionType::REQUIRED) {
int num_definition_bytes = *reinterpret_cast<const uint32_t*>(buffer);

// Temporary hack until schema resolution
max_definition_level_ = 1;

buffer += sizeof(uint32_t);
definition_level_decoder_.reset(
new RleDecoder(buffer, num_definition_bytes, 1));
buffer += num_definition_bytes;
uncompressed_len -= sizeof(uint32_t);
uncompressed_len -= num_definition_bytes;
} else {
// REQUIRED field
max_definition_level_ = 0;
}

// TODO: repetition levels
Expand Down Expand Up @@ -165,6 +173,39 @@ bool TypedColumnReader<TYPE>::ReadNewPage() {
return true;
}

// ----------------------------------------------------------------------
// Batch read APIs

static size_t DecodeMany(RleDecoder* decoder, int16_t* levels, size_t batch_size) {
size_t num_decoded = 0;

// TODO(wesm): Push this decoding down into RleDecoder itself
for (size_t i = 0; i < batch_size; ++i) {
if (!decoder->Get(levels + i)) {
break;
}
++num_decoded;
}
return num_decoded;
}

size_t ColumnReader::ReadDefinitionLevels(size_t batch_size, int16_t* levels) {
if (!definition_level_decoder_) {
return 0;
}
return DecodeMany(definition_level_decoder_.get(), levels, batch_size);
}

size_t ColumnReader::ReadRepetitionLevels(size_t batch_size, int16_t* levels) {
if (!repetition_level_decoder_) {
return 0;
}
return DecodeMany(repetition_level_decoder_.get(), levels, batch_size);
}

// ----------------------------------------------------------------------
// Dynamic column reader constructor

std::shared_ptr<ColumnReader> ColumnReader::Make(const parquet::ColumnMetaData* metadata,
const parquet::SchemaElement* element, std::unique_ptr<InputStream> stream) {
switch (metadata->type) {
Expand Down
150 changes: 112 additions & 38 deletions cpp/src/parquet/column_reader.h → cpp/src/parquet/column/reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
#ifndef PARQUET_COLUMN_READER_H
#define PARQUET_COLUMN_READER_H

#include <exception>
#include <algorithm>
#include <cstdint>
#include <cstring>
#include <memory>
Expand Down Expand Up @@ -48,6 +48,7 @@ struct hash<parquet::Encoding::type> {
namespace parquet_cpp {

class Codec;
class Scanner;

class ColumnReader {
public:
Expand All @@ -68,13 +69,14 @@ class ColumnReader {
static std::shared_ptr<ColumnReader> Make(const parquet::ColumnMetaData*,
const parquet::SchemaElement*, std::unique_ptr<InputStream> stream);

virtual bool ReadNewPage() = 0;

// Returns true if there are still values in this column.
bool HasNext() {
if (num_buffered_values_ == 0) {
ReadNewPage();
if (num_buffered_values_ == 0) return false;
// Either there is no data page available yet, or the data page has been
// exhausted
if (num_buffered_values_ == 0 || num_decoded_values_ == num_buffered_values_) {
if (!ReadNewPage() || num_buffered_values_ == 0) {
return false;
}
}
return true;
}
Expand All @@ -87,9 +89,22 @@ class ColumnReader {
return metadata_;
}

const parquet::SchemaElement* schema() const {
return schema_;
}

protected:
// Reads the next definition and repetition level. Returns true if the value is NULL.
bool ReadDefinitionRepetitionLevels(int* def_level, int* rep_level);
virtual bool ReadNewPage() = 0;

// Read multiple definition levels into preallocated memory
//
// Returns the number of decoded definition levels
size_t ReadDefinitionLevels(size_t batch_size, int16_t* levels);

// Read multiple repetition levels into preallocated memory
//
// Returns the number of decoded repetition levels
size_t ReadRepetitionLevels(size_t batch_size, int16_t* levels);

Config config_;

Expand All @@ -103,17 +118,28 @@ class ColumnReader {

parquet::PageHeader current_page_header_;

// Not set if field is required.
// Not set if full schema for this field has no optional or repeated elements
std::unique_ptr<RleDecoder> definition_level_decoder_;

// Not set for flat schemas.
std::unique_ptr<RleDecoder> repetition_level_decoder_;

// Temporarily storing this to assist with batch reading
int16_t max_definition_level_;

// The total number of values stored in the data page. This is the maximum of
// the number of encoded definition levels or encoded values. For
// non-repeated, required columns, this is equal to the number of encoded
// values. For repeated or optional values, there may be fewer data values
// than levels, and this tells you how many encoded levels there are in that
// case.
int num_buffered_values_;

// The number of values from the current data page that have been decoded
// into memory
int num_decoded_values_;
int buffered_values_offset_;
};


// API to read values from a single column. This is the main client facing API.
template <int TYPE>
class TypedColumnReader : public ColumnReader {
Expand All @@ -128,20 +154,33 @@ class TypedColumnReader : public ColumnReader {
values_buffer_.resize(config_.batch_size * value_byte_size);
}

// Returns the next value of this type.
// TODO: batchify this interface.
T NextValue(int* def_level, int* rep_level) {
if (ReadDefinitionRepetitionLevels(def_level, rep_level)) return T();
if (buffered_values_offset_ == num_decoded_values_) BatchDecode();
return reinterpret_cast<T*>(&values_buffer_[0])[buffered_values_offset_++];
}
// Read a batch of repetition levels, definition levels, and values from the
// column.
//
// Since null values are not stored in the values, the number of values read
// may be less than the number of repetition and definition levels. With
// nested data this is almost certainly true.
//
// To fully exhaust a row group, you must read batches until the number of
// values read reaches the number of stored values according to the metadata.
//
// This API is the same for both V1 and V2 of the DataPage
//
// @returns: actual number of levels read (see values_read for number of values read)
size_t ReadBatch(int batch_size, int16_t* def_levels, int16_t* rep_levels,
T* values, size_t* values_read);

private:
void BatchDecode();
typedef Decoder<TYPE> DecoderType;

// Advance to the next data page
virtual bool ReadNewPage();

typedef Decoder<TYPE> DecoderType;
// Read up to batch_size values from the current data page into the
// pre-allocated memory T*
//
// @returns: the number of values read into the out buffer
size_t ReadValues(size_t batch_size, T* out);

// Map of compression type to decompressor object.
std::unordered_map<parquet::Encoding::type, std::shared_ptr<DecoderType> > decoders_;
Expand All @@ -150,6 +189,59 @@ class TypedColumnReader : public ColumnReader {
std::vector<uint8_t> values_buffer_;
};


template <int TYPE>
inline size_t TypedColumnReader<TYPE>::ReadValues(size_t batch_size, T* out) {
size_t num_decoded = current_decoder_->Decode(out, batch_size);
num_decoded_values_ += num_decoded;
return num_decoded;
}

template <int TYPE>
inline size_t TypedColumnReader<TYPE>::ReadBatch(int batch_size, int16_t* def_levels,
int16_t* rep_levels, T* values, size_t* values_read) {
// HasNext invokes ReadNewPage
if (!HasNext()) {
*values_read = 0;
return 0;
}

// TODO(wesm): keep reading data pages until batch_size is reached, or the
// row group is finished
batch_size = std::min(batch_size, num_buffered_values_);

size_t num_def_levels = 0;
size_t num_rep_levels = 0;

// If the field is required and non-repeated, there are no definition levels
if (definition_level_decoder_) {
num_def_levels = ReadDefinitionLevels(batch_size, def_levels);
}

// Not present for non-repeated fields
if (repetition_level_decoder_) {
num_rep_levels = ReadRepetitionLevels(batch_size, rep_levels);

if (num_def_levels != num_rep_levels) {
throw ParquetException("Number of decoded rep / def levels did not match");
}
}

// TODO(wesm): this tallying of values-to-decode can be performed with better
// cache-efficiency if fused with the level decoding.
size_t values_to_read = 0;
for (size_t i = 0; i < num_def_levels; ++i) {
if (def_levels[i] == max_definition_level_) {
++values_to_read;
}
}

*values_read = ReadValues(values_to_read, values);

return num_def_levels;
}


typedef TypedColumnReader<parquet::Type::BOOLEAN> BoolReader;
typedef TypedColumnReader<parquet::Type::INT32> Int32Reader;
typedef TypedColumnReader<parquet::Type::INT64> Int64Reader;
Expand All @@ -159,24 +251,6 @@ typedef TypedColumnReader<parquet::Type::DOUBLE> DoubleReader;
typedef TypedColumnReader<parquet::Type::BYTE_ARRAY> ByteArrayReader;
typedef TypedColumnReader<parquet::Type::FIXED_LEN_BYTE_ARRAY> FixedLenByteArrayReader;


template <int TYPE>
void TypedColumnReader<TYPE>::BatchDecode() {
buffered_values_offset_ = 0;
T* buf = reinterpret_cast<T*>(&values_buffer_[0]);
int batch_size = config_.batch_size;
num_decoded_values_ = current_decoder_->Decode(buf, batch_size);
}

inline bool ColumnReader::ReadDefinitionRepetitionLevels(int* def_level, int* rep_level) {
*rep_level = 1;
if (definition_level_decoder_ && !definition_level_decoder_->Get(def_level)) {
ParquetException::EofException();
}
--num_buffered_values_;
return *def_level == 0;
}

} // namespace parquet_cpp

#endif // PARQUET_COLUMN_READER_H
Loading

0 comments on commit ab5f61b

Please sign in to comment.